为了避免远程调用接口的信息过于敏感直接对外部暴露[比如查询用户密码密文],可能需要对一些远程调用接口做权限控制,只允许特定的用户角色访问,让某个接口只限于系统内部使用,不被外部访问,常用的限制方式如下,以下方法可以单独使用,也可以组合使用,在实际应用中需要根据业务需求和安全策略来选择最合适的方法。
使用Spring Security进行权限控制:
配置Spring Security,为特定的接口添加权限控制,只允许特定的用户角色访问
[配置实例]
在这个例子中,/internal/**
路径下的接口只有具有"INTERNAL"
角色的用户才能访问
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/admin/**").hasRole("ADMIN")
.antMatchers("/internal/**").hasRole("INTERNAL")
.anyRequest().authenticated()
.and()
.httpBasic();
}
使用IP白名单:
如果你希望接口只能被特定的IP地址访问,你可以在Spring Security配置中添加IP白名单。
[配置实例]
在这个例子中,只有来自IP地址192.168.1.100
的请求才能访问/internal/**
路径下的接口。
xxxxxxxxxx
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/internal/**").permitAll()
.anyRequest().authenticated()
.and()
.ip()
.access("hasIpAddress('192.168.1.100')")
.and()
.httpBasic();
}
使用Spring Cloud Gateway或Zuul作为API网关:
在API网关层面控制访问权限,只允许内部服务访问特定的路由。
[配置实例]
在这个例子中,/internal/**
路径下的接口被配置在Spring Cloud Gateway中,并且可以进一步配置安全规则来限制访问。
xxxxxxxxxx
spring
cloud
gateway
routes
id internal-service
uri lb //internal-service
predicates
Path=/internal/**
filters
RewritePath=/internal/(?<segment>.*), /$\segment
security
oauth2ResourceServer()
在服务层面进行控制:
在服务代码中添加逻辑,检查请求的来源,例如通过特定的HTTP头或者请求参数来判断请求是否来自内部系统。
[配置实例]
在这个例子中,只有带有X-Internal-Request
头且值为true
的请求才能访问/internal/data
接口
xxxxxxxxxx
public class InternalController {
"/internal/data") (
public ResponseEntity<?> getInternalData( ("X-Internal-Request") String internalRequest) {
if (!"true".equals(internalRequest)) {
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
// 处理请求
}
}
使用内部网络和防火墙规则:
将你的服务部署在内部网络中,并通过防火墙规则限制外部访问。这是一种常见的做法,可以确保只有内部网络中的服务可以访问特定的接口。
定时任务是常用的操作,比如每天的支付宝账单对账、每个月的财务汇总,系统闲时定时统计当前系统中重要数据的操作
常见的定时调度任务实现方法有:基于Java
原生Timer
、基于延迟消息队列、基于Quartz
任务调度框架、基于xxl-job
、基于netty
时间轮实现的io.netty.util.HashedWheelTimer
、SpringBoot
的定时任务注解@Scheduled
基于JDK的java.util.Timer
的定时任务实现
🔎:从JDK1.3起就有一个java.util.Timer
类来实现延时、定时执行任务的功能;Timer
的优点是简单易用,但是致命缺点是所有的任务都是被同一个线程执行的,同一时间只能有一个任务执行,剩下任务都得等,执行过程中如果有一个任务发生延迟或者异常都会影响后续其他任务的执行,异常甚至会导致后续任务作废,适合任务数不确定的短时定时任务、定时任务需要单独控制的场景比如分布式锁的有效时间续期,早期的Redisson
框架就是基于Timer
做的定时锁过期时间重置,随着版本更新也换成了netty
的的时间轮实现io.netty.util.HashedWheelTimer
java.util.Timer
的使用
void--->timer.schedule(TimerTask task, long delay, long period)
功能解析:定义在定时任务线程创建以后在delay
毫秒以后开始间隔period
时间执行一次的定时任务,task是任务对象,TimeTask是抽象类
,需要实现抽象run方法;delay是相对于定时调度线程启动时的初始延迟时间,单位是毫秒,period是两次执行的间隔时间,单位是毫秒
使用示例:
xxxxxxxxxx
/**
* Thread[main,5,main]| 定时任务初始时间:172418119 1863
* Thread[Timer-0,5,main]| 定时任务执行时间:17241811 96874
* Thread[Timer-0,5,main]| 定时任务执行时间:17241812 06887
* Thread[Timer-0,5,main]| 定时任务执行时间:17241812 16889
* Thread[Timer-0,5,main]| 定时任务执行时间:17241812 26904
* */
public static void main(String[] args) {
System.out.println(Thread.currentThread()+"| 定时任务初始时间:"+System.currentTimeMillis());
new Timer().schedule(new TimerTask() {
public void run() {
System.out.println(Thread.currentThread()+"| 定时任务执行时间:"+System.currentTimeMillis());
}
},5000,10000);
}
示例含义:定时线程启动后初始延时5秒钟开始每隔十秒打印定时任务执行时间
补充说明:
🔎:TimerTask
实现了Runnable
接口,是一个抽象类,实例化对象需要实现其中的抽象run方法
void--->schedule(TimerTask task, long delay)
功能解析:定义在定时任务线程创建以后在delay
毫秒以后开始只执行一次的定时任务,task是任务对象,TimeTask是抽象类
,需要实现抽象run方法;delay是相对于定时调度线程启动时的初始延迟时间,单位是毫秒,执行完一次定时任务线程就会销毁
使用示例:
xpublic class DistributedRedisLock implements Lock {
private StringRedisTemplate redisTemplate;
private String lockName;
private String hSetField;
private long expire = 30;
public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.hSetField = uuid + ":" + Thread.currentThread().getId();
}
public void lock() {
this.tryLock();
}
public void lockInterruptibly() throws InterruptedException {
}
//无参tryLock方法是使用默认有效时间30s作为锁的有效时间
public boolean tryLock() {
try {
return this.tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 加锁方法
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1){
this.expire = unit.toSeconds(time);
}
String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), hSetField, String.valueOf(expire))){
Thread.sleep(50);
}
// 加锁成功,返回之前,开启定时器自动续期
this.renewExpire();
return true;
}
/**
* 解锁方法
*/
public void unlock() {
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
"then " +
" return nil " +
"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
"then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), hSetField);
if (flag == null){
throw new IllegalMonitorStateException("this lock doesn't belong to you!");
}
}
public Condition newCondition() {
return null;
}
//上锁以后延迟1/3锁有效时间去定时执行续期脚本;注意execute方法会自动将lua的返回值1转成true,将返回值0返回false,注意nil也会转成false;如果需要区分nil需要使用Long类型的返回值,对应nil转成null;
//定时任务采用延时执行一次任务,执行完以后线程就销毁,如果续期成功再次执行该任务,续期失败说明锁已经不是当前线程的锁了,不再进行续期操作;如果不这么写就需要去解锁成功以后取消定时任务,需要考虑各种意外情况导致定时线程无法被取消导致的死锁的情况,实现起来比较复杂,但是这种实现在分布式锁比较多的情况下会占用很多的线程资源
//这里更改此前的getId方法是因为定时续期的execute方法也需要获取field字段,但是不能使用getId中的逻辑,因为定时任务需要开新的定时任务线程,但是field字段需要业务线程的线程id,因此这里只是改进将getId的逻辑放到了获取锁时的实例化锁对象的构造方法中,实际上锁重入只是key和Field字段相同,实际上锁重入并不是同一个DistributedRedisLock对象
private void renewExpire(){
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), hSetField, String.valueOf(expire))) {
renewExpire();
}
}
}, this.expire * 1000 / 3);
}
}
示例含义:DistributedRedisLock
是基于Redis
+java.util.Timer
+lua脚本
来实现自动续期的可重入分布式锁实现,其中的renewExpire()
方法就是使用Timer实现分布式锁自动续期的业务逻辑代码,这里面的又上一个续期任务执行成功唤醒下一个定时续期任务的思想值得借鉴,这样可以避免出现意外导致定时任务停不下来
补充说明:
🔎:该方法一般用在控制定时任务线程需要满足一定条件才能继续执行避免定时任务因为意外情况无法自动停止的场景,比如分布式锁的自动续期
订单业务逻辑
带回滚的锁库存逻辑
数据库mall-sms
中的表sms_stock_order_task
记录着当前那个订单正在锁库存,在表sms_stock_order_task_detail
表中记录着商品id、锁定库存的数量、锁定库存所在仓库id、订单任务号;即锁库存的时候先给数据库保存要锁定的库存记录,然后再锁定库存;只要锁定库存成功,库存相关的三张表因为本地事务都会成功保存锁库存记录;如果锁失败了数据库因为事务不会有锁库存记录,库存也不会锁定成功
这样库存锁定表中存在的就是下单成功锁定库存的记录和下单失败但是锁定库存成功的记录,我们可以考虑使用一个定时任务,每隔一段时间就扫描一次数据库,检查一下哪些订单没有创建但是有锁定库存的记录,把这些锁定库存记录拿出来重新把库存补偿回滚一下,但是使用定时任务来定期扫描整个数据库表是很麻烦的一件事,我们通过引入延时消息队列来实现定时功能
延时队列的原理是当库存锁定成功以后我们将库存锁定成功的消息发送给延时队列,但是在一定时间内消息被暂存在延时队列中不要往外发送,即锁定库存成功的消息暂存在延时队列中一段时间,在订单支付时间过期以后我们将该消息发送给解锁库存服务,解锁库存服务去检查订单是否被取消,如果订单根本没有创建或者因为订单未支付而被自动取消了就去数据库根据对应的锁库存记录将库存解锁,即锁定的库存在订单最大失效时间以后才固定进行解锁,通过延时消息队列来控制这个定时功能
延时队列锁库存的业务流程
1️⃣:创建订单锁定库存成功后,给主题交换器stock-event-exchange
发送库存工作单消息,消息的路由键stock.locked
,被主题交换器路由到队列stock.delay.queue
中等待50分钟过期时间
2️⃣:库存工作单在消息存活时间到期以后,队列将消息的路由键更改为stock.release
,通过主题交换器stock-event-exchange
将消息路由到队列stock.release.stock.queue
,由该队列将消息转发给消费者库存服务
3️⃣:消费者库存服务收到库存工作单消息,检查订单服务对应订单的状态,如果订单未被成功创建或者订单未支付就解锁被锁定的库存
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务RabbitMQ配置
* @创建日期 2024/12/02
* @since 1.0.0
*/
public class StockRabbitMQConfig {
/**
* @return {@link MessageConverter }
* @描述 给容器中注入一个使用Jackson将消息对象序列化为json对象的消息转换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/01
* @since 1.0.0
*/
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* @return {@link Queue }
* @描述 库存延迟队列延时队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue stockDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","stock-event-exchange");
arguments.put("x-dead-letter-routing-key","stock.release");
arguments.put("x-message-ttl",120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
/**
* @return {@link Queue }
* @描述 库存延迟队列路由队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false);
}
/**
* @return {@link Exchange }
* @描述 库存服务通用主题交换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,false);
}
/**
* @return {@link Binding }
* @描述 延迟队列的延时队列stock.delay.queue和库存服务通用交换器stock-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderCreateOrderBinding(){
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
/**
* @return {@link Binding }
* @描述 延迟队列的路由队列stock.release.stock.queue和库存服务通用交换器stock-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderReleaseOrderBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
}
解锁库存逻辑
需要解锁库存的场景:
创建订单成功,订单过期没有支付被系统自动取消或者订单被用户手动取消时需要解锁库存
创建订单过程中,远程调用库存服务锁定库存成功,但是调用其他服务时出现异常导致创建订单整个业务回滚,之前成功锁定的库存就需要自动解锁来实现回滚,使用Seata
分布式事务性能太差,不适合下单这种高并发场景;基于柔性事务的可靠消息加最终一致性的分布式事务方案,在保证分布式事务下的性能同时,允许一定时间内的软一致性并确保库存数据的最终一致性
只要库存锁定成功就给RabbitMQ中对应的库存延迟队列发送库存工作单消息,使用RabbitTemplate.convertAndSend()
发送锁定库存消息,同时锁定库存以前我们要保存库存工作单信息[对应表wms_ware_order_task
,保存订单Id、订单号],锁定存库成功以后要保存库存工作单详情[对应表wms_ware_order_task_detail
,给该表添加bigint
类型字段ware_id
锁定库存所在仓库id;int
类型的lock_status
,其中1表示已锁定、2表示已解锁、3表示已扣减;注意MP更改了字段需要更改相应的Mapper文件中的resultMap
标签;保存商品sku
、商品数量、库存工作单id、锁定库存所在仓库id、默认锁定状态是已锁定1],锁定库存前先保存库存工作单,保存库存工作单是为了追溯锁定库存信息
给消息队列发送的消息实体类直接写在common包下,该消息对象StockLockedTo
保存库存工作单id、该工作单下所有工作单详情id列表,老师这里发送消息的时机错了,所有商品都锁定成了才给消息队列发送消息,否则本地事务会自动回滚,老师是锁定一个商品就发送一条消息,如果事务回滚了发出去的消息就撤不回来了,而且老师这里发送的消息是全量的库存工作单详情数据
[消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存被成功锁定消息
* @创建日期 2024/12/02
* @since 1.0.0
*/
public class StockLockedMessage {
/**
* 库存工作单
*/
private WareOrderTaskEntity wareOrderTask;
/**
* 库存工作单详情列表
*/
private List<WareOrderTaskDetailEntity> wareOrderTaskDetail;
}
[锁定库存保存库存工作单并发送消息]
xxxxxxxxxx
/**
* @param lockStock
* @return {@link List }<{@link LockStockResultTo }>
* @描述 根据订单和订单项数据锁定库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/01
* @since 1.0.0
*/
public List<LockStockResultTo> lockStock(LockStockTo lockStock) throws RRException{
//2. 锁定库存前创建库存工作单
WareOrderTaskEntity wareOrderTask = new WareOrderTaskEntity();
BeanUtils.copyProperties(lockStock,wareOrderTask);
wareOrderTaskService.save(wareOrderTask);
//1. 锁定库存
List<LockStockTo.OrderItem> orderItems = lockStock.getOrderItems();
List<Long> skuIds = orderItems.stream().map(LockStockTo.OrderItem::getSkuId).collect(Collectors.toList());
Map<Long, LockStockTo.OrderItem> orderItemOfSku = orderItems.stream().collect(Collectors.toMap(LockStockTo.OrderItem::getSkuId, orderItem -> orderItem));
//查询商品所在的全部仓库
List<WareIdsOfSkuIdTo> wareIdsOfSkuIdTos=baseMapper.getWareBySkuIds(skuIds);
if(wareIdsOfSkuIdTos.size()!=skuIds.size()){
throw new RRException(StatusCode.NO_STOCK_EXCEPTION.getMsg(),
StatusCode.NO_STOCK_EXCEPTION.getCode());
}
ArrayList<LockStockResultTo> lockStockResults = new ArrayList<>();
ArrayList<WareOrderTaskDetailEntity> wareOrderTaskDetails = new ArrayList<>();
for (WareIdsOfSkuIdTo wareIdsOfSkuIdTo : wareIdsOfSkuIdTos) {
Long skuId = wareIdsOfSkuIdTo.getSkuId();
String[] wareIds = wareIdsOfSkuIdTo.getWareIds().split(",");
Boolean skuLocked = false;
for (String wareId : Arrays.asList(wareIds)) {
if(baseMapper.tryLockStock(wareId,skuId,orderItemOfSku.get(skuId).getSkuQuantity())==1){
//准备锁定库存的响应数据
LockStockResultTo lockStockResult = new LockStockResultTo();
skuLocked = true;
lockStockResult.setLocked(true);
lockStockResult.setLockQuantity(orderItemOfSku.get(skuId).getSkuQuantity());
lockStockResult.setWareId(Long.parseLong(wareId));
lockStockResult.setSkuId(skuId);
lockStockResults.add(lockStockResult);
//准备库存工作单详情
WareOrderTaskDetailEntity wareOrderTaskDetail = new WareOrderTaskDetailEntity();
wareOrderTaskDetail.setTaskId(wareOrderTask.getId());
wareOrderTaskDetail.setSkuId(skuId);
wareOrderTaskDetail.setSkuName(orderItemOfSku.get(skuId).getSkuName());
wareOrderTaskDetail.setSkuNum(orderItemOfSku.get(skuId).getSkuQuantity());
wareOrderTaskDetail.setWareId(Long.parseLong(wareId));
wareOrderTaskDetails.add(wareOrderTaskDetail);
break;
}
}
if(!skuLocked){
throw new RRException("商品"+skuId+StatusCode.NO_STOCK_EXCEPTION.getMsg(),
StatusCode.NO_STOCK_EXCEPTION.getCode());
}
}
//for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
// wareOrderTaskDetailService.save(wareOrderTaskDetail);
//}
wareOrderTaskDetailService.saveBatch(wareOrderTaskDetails);
StockLockedMessage stockLockedMessage = new StockLockedMessage();
stockLockedMessage.setWareOrderTask(wareOrderTask);
stockLockedMessage.setWareOrderTaskDetail(wareOrderTaskDetails);
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",stockLockedMessage);
return lockStockResults;
}
使用@RabbitHandler
监听锁定库存消息队列,获取到消息对象,按以下情况执行解锁库存逻辑
1️⃣:创建订单过程中,库存锁定成功,但是接下来创建订单出现问题,整个订单回滚,被锁定的库存需要自动解锁
只要库存工作单详情存在,就说明库存锁定成功,此时我们就要看订单状态来判断是否需要解锁库存,如果订单都没有说明订单没有被成功创建,此时就要使用库存工作单详情来解锁库存
如果有订单查看订单状态,如果订单已经被取消就解锁库存,只要订单没有被取消就不能解锁库存,订单被取消字段status
等于4
根据库存工作单的id去订单服务查询订单实体类,如果订单不存在或者订单的status
字段为4就调用unLockStock
方法解锁库存
2️⃣:订单创建失败是由于库存锁定失败导致的
库存工作单数据没有是库存本地事务整体回滚导致的,库存工作单记录不会创建,锁定库存操作也会全部自动回滚,这种情况无需解锁
解锁库存需要知道商品的skuId
、锁定库存所在仓库id、锁定库存数量、库存工作单详情id
,解锁就是将原来的增加的锁定库存的字段再减掉UPDATE wms_ware_sku SET stock_locked = stock_locked + #{num} WHERE sku_id=#{skuId} AND ware_id = #{wareId}
🚁:自动应答的消息队列,一旦在消息的消费过程中出现异常导致消息无法被正常消费,消息就丢失了,比如Feign远程调用网络闪断,或者远程服务的Feign调用必须携带用户的登录状态但是实际请求没有携带用户登录状态被远程服务拦截,抛出异常终止后续方法执行,此时消息就彻底丢失了
📓:使用配置spring.rabbitmq.listener.simple.acknowledge-mode=manual
开启消息接收手动应答,在订单解锁成功以后使用方法channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)
来做消息接收手动应答,解锁成功立马手动应答,无需解锁什么也不用做立马手动应答,只要在应答后发生异常也不会导致消息丢失;如果远程调用没有成功返回在库存解锁以前出现问题,我们使用方法channel.basicReject(message.getMessageProperties().getDeliveryTag(),true)
来手动拒绝消息并将消息重新放到队列中,给别人继续消费消息解锁库存的机会,比如由于分区故障为了保证一致性部分服务不可用
❓:订单服务所有远程调用请求都要求有登录状态,但是我们的消息队列监听方法的远程调用不可能带用户登录状态,因此我们需要在订单服务的拦截器中放行所有消息队列监听方法调用的订单服务远程接口,特别注意,这个需要被放行的请求路径还拼接了请求参数导致请求路径是动态的
🔑:我们通过在拦截器中放行指定URI的请求来实现这个目的,对于URI是变化的我们使用Spring
提供的boolean match = AntPathMatcher.match("/order/order/status/**",request.getRequestURI())
[HttpServletRequest.getRequestURI()
是获取请求路径的URI,HttpServletRequest.getRequestURL()
是获取请求路径的URL],如果请求URI匹配我们需要的格式就直接通过拦截器的return true
放行,无需再进行用户登录状态检查
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 用户登录状态拦截器
* @创建日期 2024/11/09
* @since 1.0.0
*/
public class LoginStatusInterceptor implements HandlerInterceptor {
public static ThreadLocal<UserBaseInfoVo> loginUser=new ThreadLocal<>();
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
AntPathMatcher antPathMatcher = new AntPathMatcher();
if(antPathMatcher.match("/order/order/get/**", request.getRequestURI())){
return true;
}
UserBaseInfoVo attribute = (UserBaseInfoVo)request.getSession().getAttribute(MallConstant.SESSION_USER_LOGIN_STATUS_KEY);
if(attribute!=null){
loginUser.set(attribute);
return true;
}else{
request.getSession().setAttribute("tip","请先登录");
response.sendRedirect("http://auth.earlmall.com/login.html");
return false;
}
}
}
专门抽取一个消息队列的监听器Service来处理消息队列中的消息
在类上标注@RabbitListener(queues="stock.release.stock.queue")
来监听指定队列,在类上标注@Service
注解将该类的实例化对象作为容器组件,在具体的方法上标注注解@RabbitHandler
,在该方法中调用库存服务实现的解锁库存逻辑,解锁库存的方法出现任何异常都手动拒绝消息并重新入队列,只要解锁库存方法成功调用就手动应答接收消息,远程调用如果状态码不是0说明没有查到对应订单的实体类,此时直接抛异常执行拒绝接收消息的逻辑
[监听队列消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
try{
wareSkuService.tryReleaseStock(stockLockedMessage.getWareOrderTask().getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(stockLockedMessage.getClass().getTypeName());
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(stockLockedMessage));
save(messageLog);
}
}
}
}
[解锁库存]
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据库存锁定消息尝试解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(String orderSn) {
//1. 根据订单号远程查询订单服务对应的订单状态,如果订单不存在或者订单状态不是已关闭就不解锁库存
R res = orderFeignClient.getOrderByOrderSn(orderSn);
OrderTo order = res.get("order", new TypeReference<OrderTo>() {});
if(order==null && order.getStatus()==4){
return;
}
//2. 根据订单号查询库存工作单id
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().
eq("order_sn", orderSn));
if(wareOrderTaskEntity==null){
return;
}
List<WareOrderTaskDetailEntity> wareOrderTaskDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", wareOrderTaskEntity.getId()));
for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
//检索库存工作单状态,只有库存工作单状态为未解锁时才解锁库存,如果不为未解锁本条库存不解锁
if(wareOrderTaskDetail.getLockStatus()!=2){
baseMapper.releaseStock(wareOrderTaskDetail.getSkuNum(),wareOrderTaskDetail.getSkuId(),wareOrderTaskDetail.getWareId());
//解锁后将库存工作单状态更改为已解锁
wareOrderTaskDetail.setLockStatus(2);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
}
}
解锁库存成功后通过库存工作单详情id将库存工作单详情的状态lock_status
更改为已解锁2,增加前面解锁库存的条件只有库存工作单详情为已锁定状态且需要解锁时才能解锁库存
带回滚的锁库存实现
给库存服务mall-ware
引入、配置RabbitMQ
并在主启动类上标注@EnableRabbit
开启RabbitMQ
功能
配置RabbitMQ
的消息的JSON
序列化机制
给库存服务添加一个默认交换器stock-event-exchange
交换器使用Topic
交换器类型,因为该交换器需要绑定多个队列,而且还需要使用对不同消息的路由键进行模糊匹配的功能
给库存服务添加一个释放库存队列stock.release.stock.queue
,支持持久化,不支持排他和自动删除,普通队列不需要设置参数
给库存服务添加一个延迟库存工作单消息的队列stock.delay.queue
,给该延时队列设置死信交换器stock-event-exchange
,设置死信的路由键为stock.release
,设置队列的消息存活时间为120秒[方便测试用的,比验证订单创建的延迟队列多一分钟],支持持久化,不支持排他和自动删除
给库存服务的库存释放队列和交换器添加一个绑定关系,绑定目的地stock.release.stock.queue
,交换器stock-event-exchange
,绑定键stock.release.#
给库存服务的库存延时队列和交换器添加一个绑定关系,绑定目的地stock.delay.queue
,交换器stock-event-exchange
,绑定键stock.delay.#
监听一个队列让以上所有容器组件都通过SpringBoot
自动去RabbitMQ
中检查创建
取消订单逻辑
订单服务队列和交换器组件和绑定关系
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 自定义RabbitMQ配置
* 1. 使用@Bean注解注入容器的队列、交换器、绑定关系如果在RabbitMQ服务器中没有SpringBoot会自动在RabbitMQ服务器中进行创建
* @创建日期 2024/11/26
* @since 1.0.0
*/
public class CustomRabbitMQConfig {
/**
* @return {@link Queue }
* @描述 订单延迟队列延时队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue orderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}
/**
* @return {@link Queue }
* @描述 订单延迟队列路由队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
/**
* @return {@link Exchange }
* @描述 订单服务通用主题交换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
/**
* @return {@link Binding }
* @描述 延迟队列的延时队列order.delay.queue和订单服务通用交换器order-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderCreateOrderBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* @return {@link Binding }
* @描述 延迟队列的路由队列order.release.order.queue和订单服务通用交换器order-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
public Binding orderReleaseStockBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}
订单创建成功就给交换器order-event-exchange
发送消息,消息路由键order.create.order
,保存的消息是OrderCreateTO.getOrder()
,消息会被交换器路由到order.delay.queue
[队列中的消息延时时间为30min],消息变成死信后路由键配置成order.release.order
并将消息转发到order-event-exchange
路由到order.release.order.queue
被订单服务监听,订单服务监听接收取消订单并将消息转发以路由键order.release.other
通过交换器order-event-exchange
,队列将消息转发给订单服务
[订单服务创建订单发起消息]
xxxxxxxxxx
/**
* @param orderParam
* @return {@link PayVo }
* @描述 创建订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/30
* @since 1.0.0
*/
//@GlobalTransactional
public PayVo createOrder(OrderSubmitParamVo orderParam) throws RRException{
UserBaseInfoVo userBaseInfo = LoginStatusInterceptor.loginUser.get();
String orderSn = IdWorker.getTimeId();
Long userId = userBaseInfo.getId();
//1. 校验订单提交令牌,校验失败抛出表单重复提交异常
if(!verifyOrderSubmitToken(userId,orderParam.getToken())){
throw new RRException(StatusCode.FORM_REPEAT_EXCEPTION.getMsg(),
StatusCode.FORM_REPEAT_EXCEPTION.getCode());
}
//2. 创建订单
OrderEntity order = createOrder(userId, orderParam.getAddrId(),orderSn);
//3. 创建订单项列表
//远程调用购物车服务通过用户id获取购物车被选中的购物项
List<OrderStatementVo.SelectedCartItemVo> selectedCartItems = cartFeignClient.getSelectedCartItem(userId);
if(selectedCartItems==null || selectedCartItems.size()<=0){
throw new RRException(StatusCode.NO_CART_ITEM_EXCEPTION.getMsg(),
StatusCode.NO_CART_ITEM_EXCEPTION.getCode());
}
List<OrderItemEntity> orderItems = createOrderItems(orderSn,selectedCartItems);
//4. 计算订单价格
calculateOrderAmount(order,orderItems);
//7. 验价
if (Math.abs(order.getPayAmount().subtract(orderParam.getTotalPrice()).doubleValue())>=0.01) {
throw new RRException(StatusCode.PRICE_VERIFY_EXCEPTION.getMsg()+
"核算金额: ¥"+orderParam.getTotalPrice()+",实际金额: ¥"+order.getPayAmount(),
StatusCode.PRICE_VERIFY_EXCEPTION.getCode());
}
//5. 保存订单记录和订单项记录
orderService.save(order);
//seata目前在AT模式下不支持批量插入记录,https://blog.csdn.net/qq_33240556/article/details/140790581,反正我们后面要换成软性事务,后面再换成批量插入
//for (OrderItemEntity orderItem : orderItems) {
// orderItemService.save(orderItem);
//}
orderItemService.saveBatch(orderItems);
//6. 锁定库存
LockStockTo lockStock = new LockStockTo();
lockStock.setOrderId(order.getId());
lockStock.setOrderSn(orderSn);
lockStock.setConsignee(order.getReceiverName());
lockStock.setConsigneeTel(order.getReceiverPhone());
lockStock.setDeliveryAddress(order.getReceiverDetailAddress());
lockStock.setPaymentWay(1);
lockStock.setOrderItems(orderItems);
R res = stockFeignClient.lockStock(lockStock);
if (res.getCode()!=0) {
throw new RRException((String) res.get("msg"),res.getCode());
}
// 8.给消息队列发送消息
OrderCreatedMessage orderCreatedMessage = new OrderCreatedMessage();
orderCreatedMessage.setOrderSn(orderSn);
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderCreatedMessage);
//封装视图数据,注意存入分布式session的数据对应实体类最好全部放在common包下,session中有的数据不论其他服务能否用到,
// 其他服务必须包含session中所有数据的对应类型,否则其他服务即使没有使用对应的session也会直接报错
PayVo pay = new PayVo();
OrderVo orderVo = new OrderVo();
BeanUtils.copyProperties(order,orderVo);
List<OrderItemVo> orderItemVos = orderItems.stream().map(orderItem -> {
OrderItemVo orderItemVo = new OrderItemVo();
BeanUtils.copyProperties(orderItem, orderItemVo);
return orderItemVo;
}).collect(Collectors.toList());
pay.setOrder(orderVo);
pay.setOrderItems(orderItemVos);
pay.setFare(order.getFreightAmount());
pay.setPayablePrice(order.getPayAmount());
return pay;
}
[订单服务接收消息取消订单并向库存服务发送消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "order.release.order.queue") (
public class OrderReleaseQueueListenerImpl extends ServiceImpl<MessageDao,MessageEntity> implements OrderReleaseQueueListener {
RabbitTemplate rabbitTemplate;
OrderService orderService;
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void trySendMessageReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage, Channel channel) {
orderService.cancelOrder(orderCreatedMessage.getOrderSn());
try{
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderCreatedMessage);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(orderCreatedMessage.getClass().getTypeName());
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(orderCreatedMessage));
save(messageLog);
}
}
}
}
订单服务收到消息根据订单id查询数据库对应的订单状态,如果订单状态为订单创建对应的状态码,将订单状态更改为取消订单对应状态码
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据订单号关闭订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void cancelOrder(String orderSn) {
OrderEntity order = getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
order.setStatus(OrderConstant.OrderStatus.CANCELLED.getCode());
updateById(order);
}
通过监听消息队列消息在取消订单或者订单创建失败的情况下解锁库存
[消息队列监听]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
[解锁库存]
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据库存锁定消息尝试解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(String orderSn) {
//1. 根据订单号远程查询订单服务对应的订单状态,如果订单不存在或者订单状态不是已关闭就不解锁库存
R res = orderFeignClient.getOrderByOrderSn(orderSn);
OrderTo order = res.get("order", new TypeReference<OrderTo>() {});
if(order==null && order.getStatus()==4){
return;
}
//2. 根据订单号查询库存工作单id
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().
eq("order_sn", orderSn));
if(wareOrderTaskEntity==null){
return;
}
List<WareOrderTaskDetailEntity> wareOrderTaskDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", wareOrderTaskEntity.getId()));
for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
//检索库存工作单状态,只有库存工作单状态为未解锁时才解锁库存,如果不为未解锁本条库存不解锁
if(wareOrderTaskDetail.getLockStatus()!=2){
baseMapper.releaseStock(wareOrderTaskDetail.getSkuNum(),wareOrderTaskDetail.getSkuId(),wareOrderTaskDetail.getWareId());
//解锁后将库存工作单状态更改为已解锁
wareOrderTaskDetail.setLockStatus(2);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
}
}
❓:我们这里是用库存解锁时间大于取消订单时间来实现解锁库存只要订单的状态为已取消或者订单没有成功创建,就释放已经锁定的库存,但是这种方式存在很严重的问题;比如订单创建成功,但是由于各种原因,消息延迟了很久才发给消息队列,但是库存一锁定成功就将消息发送给消息队列了,导致解锁库存的消息比取消订单的消息先到期,这时候就会导致解锁库存的消息被消费,库存因为订单处于新建状态无法解锁,即使后续订单被解锁了库存也无法被解锁了;即一旦发生意外导致解锁库存的消息比取消订单的消息先到,就会发生被锁定的库存永远无法解锁的情况
🔑:让订单服务取消订单后再发一个消息路由键为order.release.other
给交换器order-event-exchange
,我们为交换器order-event-exchange
和队列stock.release.stock.queue
设定绑定关系,绑定关系设定为order.release.other.#
,让取消订单的消息被队列stock.release.stock.queue
发送给消费者库存服务。库存服务用@RabbitListener(queues="stock.release.stock.queue")
监听同一个队列stock.release.stock.queue
,用@RabbitHandler
标注的方法监听消息类型为OrderTo
,在原来解锁库存的逻辑中判断,当前库存是否解锁过,没解锁过就解锁,解锁过就不用解锁了,老师的逻辑是根据订单号查询库存工作单,根据库存工作单找到所有没有解锁的库存工作单详情调用此前解锁库存的方法进行解锁,感觉这里老师的实现不好,自己实现这部分代码
实际上解锁库存是订单取消的时候解锁一次,锁定库存成功以后一定时间再解锁一次
影响消息可靠性的因素
消息丢失:消息丢失在电商系统中是一个非常可怕的操作,比如订单消息丢失可能会影响到后续一连串比如商家确认、解锁库存、物流等等各种信息,消息可能发生丢失的原因如下:
消息从生产者发送出去,但是由于网络问题抵达RabbitMQ服务器失败,或者因为异常根本没有发送成功
这时候可以用try...catch
语句块来发送消息,发送失败在catch语句块中设置重试策略
同时给数据库创建一张消息数据库表mq_message
,建表语句如下
xxxxxxxxxx
CREATE TABLE `mq_message` (
`message_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '消息id',
`content` text COMMENT '消息内容',#序列化为json
`to_exchange` varchar(255) DEFAULT NULL COMMENT '投递交换器',
`routing_key` varchar(255) DEFAULT NULL COMMENT '路由键',
`class_type` varchar(255) DEFAULT NULL COMMENT '消息类型的全限定类名',
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL COMMENT '消息日志创建时间',
`update_time` datetime DEFAULT NULL COMMENT '消息日志更新时间',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
只要消息发送失败就给数据库存上这么一条日志,定期扫描数据库来检查消息日志状态来重新发送消息
消息到达Broker,消息只有被投递给队列才算持久化完成,一旦消息还没有到达队列,RabbitMQ服务器宕机消息就会因为还没有来得及持久化而发生丢失
开启生产者消息抵达队列确认,只要消息没有成功抵达队列就会触发生产者的returnCallback
回调,消息不能成功抵达应该设置消息重试发送和向数据库记录消息日志
开启生产者消息确认回调,只要消息成功抵达RabbitMQ服务器就触发该回调
自动ACK的状态,消费者收到消息,但是消息没有被成功消费,比如消费消息或者消费消息前出现异常或者服务器宕机,自动应答的消息会直接丢失
开启手动ACK,消息成功消费以后再手动应答接收消息,消息消费失败就手动拒绝消息让消息重新入队列,注意消息没有被应答即没有手动拒绝RabbitMQ没有收到应答的消息也会默认重新入队列再次发送
🔎:防消息丢失的核心就是做好消息生产者和消息消费者两端的消息确认机制,主要策略就是生产者的消息抵达确认回调和消费者的手动应答,凡是消息不能成功抵达服务端和消费端的消息都做好消息日志记录,定期扫描数据库,将发送失败的消息定期重新发送
消息重复:就是因为各种原因导致的消息重新投递
消息消费成功,事务已经提交,但是手动Ack的时候机器宕机或者网络连接中断导致手动Ack没有进行,RabbitMQ的消息因为没有收到应答自动将消息重新入队列并将消息状态从Unack
状态变成ready
状态,并再次将消息发送给消费者
消息消费过程中消费失败又再次重试发送消息,注意啊,虽然我们让消息消费失败消息拒绝重新入队列
解决办法是业务消息消费接口设计成幂等性接口,比如解锁库存要判断库存工作单详情的状态位,消息消费成功修改对应状态位
使用redis或者mysql防重表,将消息和业务通过唯一标识联系起来,业务被成功处理过的消息就不用再处理了
RabbitMQ的每个消息都有一个redelivered
消息属性字段,每个消息都可以通过Boolean redelivered = message.getMessageProperties().getRedelivered()
判断当前消息是否被第二次或者第N次重新投递过来的,这个一般做辅助判断,因为谁也不能保证消息在第几次消费被消费成功
消息积压:消息队列中的消息积压太多,导致消息队列的性能下降
消费者宕机导致消息积压
消费者消费能力不足,比如活动高峰期,比如消费者宕机导致的消费者集群消费能力不足,有服务完全不可用消息反复重入队列消息肯定会积压,应该设置重试次数,投递达到重试次数消息就被专门的服务处理比如存入数据库离线处理
注意消费者没有应答消费消息,队列中的消息处于Unack
状态,生产者会不停报错,让CPU飚高,非常消耗系统性能,这个问题要想办法防一下
发送者发送消息的流量太大,超出消费者的消费能力
限制发送者的流量,让服务限流业务进不来就能限制发送者的流量,不过只是因为消息中间件或者消费者能力有限就限制业务有点得不偿失
上线更多的消费者增强消息的消费能力
上线专门的消息队列消息消费服务,将消息批量从消息队列中取出来,直接写入数据库,缓解消息队列压力,然后再缓慢离线从数据库中获取消息离线处理
消息队列集群
一般都是把消息中间件专门做成一个服务,叫数据中台,负责消息发送和自动记录消息日志,消息发送失败自动进行重试,将消息发送的所有功能都考虑周到,其他服务通过调用该服务来实现消息的发送,看老师的意思,一般消息发送成功也得记录日志,这个可以作为防止消息丢失更进一步的手段,毕竟会影响性能
生产者抵达确认带数据库保存失败消息
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 RabbitMQ客户端自定义配置
* @创建日期 2024/11/01
* @since 1.0.0
*/
public class MallRabbitConfig {
private RabbitTemplate rabbitTemplate;
/**
* @return {@link MessageConverter }
* @描述 给容器中注入一个使用Jackson将消息对象序列化为json对象的消息转换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/01
* @since 1.0.0
*/
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
public void initRabbitTemplate(){
/*
1. 设置RabbitMQ服务器收到消息后的确认回调ConfirmCallback
配置配置项spring.rabbitmq.publisher-confirms=true
为rabbitTemplate设置回调实例化对象confirmCallback
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 当前消息的唯一关联标识,里面的id就是消息标识的唯一id
* @param ack 消息是否成功收到
* @param cause 消息发送失败的原因
* @描述 1. 只要消息抵达Broker就会触发该回调,与消费者和消息是否入队列无关
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/02
* @since 1.0.0
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("message confirm: {correlationData:"+correlationData+
"ack:"+ack+
"cause:"+cause+"}");
}
});
/*
2.设置RabbitMQ队列没有收到消息的确认回调ReturnCallback
配置配置项spring.rabbitmq.publisher-returns=true
配置配置项spring.rabbitmq.template.mandatory=true
为rabbitTemplate设置回调实例化对象returnCallback
* */
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 投递失败的消息本身的详细信息
* @param replyCode 导致消息投递失败的错误状态码
* @param replyText 导致消息投递失败的错误原因
* @param exchange 当时该消息发往的具体交换器
* @param routingKey 当时该消息的具体路由键
* @描述
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/03
* @since 1.0.0
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("message lose: {message:"+message+
"replyCode:"+replyCode+
"replyText:"+replyText+
"exchange:"+exchange+
"routingKey"+routingKey);
}
});
}
}
消费者手动ACK
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
使用MP数据库消息日志记录
同时给数据库创建一张消息数据库表mq_message
,建表语句如下
xxxxxxxxxx
CREATE TABLE `mq_message` (
`message_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '消息id',
`content` text COMMENT '消息内容',#序列化为json
`to_exchange` varchar(255) DEFAULT NULL COMMENT '投递交换器',
`routing_key` varchar(255) DEFAULT NULL COMMENT '路由键',
`class_type` varchar(255) DEFAULT NULL COMMENT '消息类型的全限定类名',
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL COMMENT '消息日志创建时间',
`update_time` datetime DEFAULT NULL COMMENT '消息日志更新时间',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
MP插入消息记录
[实体类]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务消息实体
* @创建日期 2024/12/02
* @since 1.0.0
*/
"sms_message") (
public class MessageEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息id
*/
private Long messageId;
/**
* 消息内容
*/
private String content;
/**
* 投递交换器
*/
private String toExchange;
/**
* 路由键
*/
private String routingKey;
/**
* 消息类型的全限定类名
*/
private String classType;
/**
* 0-新建 1-已发送 2-错误抵达 3-已抵达
*/
private Integer messageStatus;
/**
* 消息日志创建时间
*/
fill = FieldFill.INSERT) (
private Date createTime;
/**
* 消息日志更新时间
*/
fill = FieldFill.INSERT_UPDATE) (
private Date updateTime;
}
[持久化接口]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务消息
* @创建日期 2024/12/02
* @since 1.0.0
*/
public interface MessageDao extends BaseMapper<MessageEntity> {
}
[持久化接口对应xml]
xxxxxxxxxx
<mapper namespace="com.earl.mall.stock.dao.MessageDao">
<!-- 可根据自己的需求,是否要使用 -->
<resultMap type="com.earl.mall.stock.entity.MessageEntity" id="messageMap">
<result property="messageId" column="message_id"/>
<result property="content" column="content"/>
<result property="toExchange" column="to_exchange"/>
<result property="routingKey" column="routing_key"/>
<result property="classType" column="class_type"/>
<result property="messageStatus" column="message_status"/>
<result property="createTime" column="create_time"/>
<result property="updateTime" column="update_time"/>
</resultMap>
</mapper>
[业务实现类]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
这是非常流行的任务调度器框架,官网:Quartz Enterprise Job Scheduler
定时任务的执行时刻
对于多生态的超大系统,比如尚硅谷[atguigu.com
]旗下有尚硅谷在线教育[gulixueyuan.com
]、有尚硅谷电子商城[gulimall.com
]、众筹系统[gulifunding.com
]等等,如果我们每个小系统都设置一个账号,用户使用旗下的每个产品都需要注册一个账号,这样用户使用体验非常不好
在这种超大型系统中,我们希望专门抽取一个认证中心专门来处理用户登录业务,只要用户在一个小系统中进行了登录,任何其他系统都能保持用户的登录状态,在任何一个小系统中登出,就在整个系统中的其他应用也登出,而且这些小系统的顶级域名还可以不一样,这就是单点登录
这种单点登录是不能使用session来解决不同系统中用户登录状态的识别问题,因为下发cookie的作用域最大只能放大到一级域名即顶级域名,只要请求访问的顶级域名变化了就无法携带其他顶级域名和子域服务下发的cookie,就没法拿着这个cookie去验证用户的登录状态
开源的单点登录demo
码云搜索徐雪里/xxl-sso,这是一个XXL社区提供的分布式单点登录框架,下载压缩包或者克隆到本地
项目目录结构
xxl-sso-core
是核心包
xxl-sso-server
是登录中心服务器
xxl-sso-samples
是一些简单的例子,有基于cookie
和session
的xxl-sso-web-sample-springboot
,也有基于token
的xxl-sso-token-sample-springboot
,这两个就是下面使用逻辑的客户端
认证中心配置文件解析[配置文件在目录xxl-sso-server
中,后面没有特殊说明都在该目录下]
xxxxxxxxxx
### web
server.port=8080 #服务器的端口是8080
server.servlet.context-path=/xxl-sso-server #访问路径为8080/xxl-sso-server
### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/
### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########
### xxl-sso
xxl.sso.redis.address=redis://127.0.0.1:6379 #配置了redis,我们需要将其改成自己的redis服务器配置
xxl.sso.redis.expire.minute=1440
测试目录xxl-sso-samples
下客户端的配置文件解析
[xxl-sso-web-sample-springboot
]
使用命令mvn clean package -Dmaven.skip.test=true
,如果单独对这个子项目打包,注意这个子项目依赖核心包xxl-sso-core
,在对xxl-sso-web-sample-springboot
进行打包的时候会去本地仓库找对应的核心包依赖,只打包当前项目是不会安装到本地仓库的,因此需要先在核心包xxl-sso-core
的pom.xml
所在目录下使用命令mvn install
将该核心包安装到本地仓库中,这种自定义的所有被依赖包必须在本地仓库有了才能对当前项目进行打包
我们可以直接在父项目pom.xml
所在使用命令mvn clean package -Dmaven.skip.test=true
一次性将所有的子项目都打包好,这个就是谷粒商城的手动打包方式,如果是整体打包,打包子项目就不需要单独将被依赖的包安装到本地仓库中
使用命令java -jar xxl-sso-web-sample-springboot-1.1.1-SNAPSHOT.jar --server.port=8081
来启动测试实例项目
注意可以在配置文件更改服务的端口,也可以在启动命令中更改服务实例的端口
使用命令java -jar xxl-sso-web-sample-springboot-1.1.1-SNAPSHOT.jar --server.port=8082
来启动测试实例项目
xxxxxxxxxx
### web
server.port=8081
server.servlet.context-path=/xxl-sso-web-sample-springboot #访问路径默认是8081/xxl-sso-web-sample-springboot
### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/
### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########
### xxl-sso
xxl.sso.server=http://xxlssoserver.com:8080/xxl-sso-server #这个是认证中心的地址,这个地址需要和实际认证中心的地址保持一致
xxl.sso.logout.path=/logout
xxl-sso.excluded.paths=
xxl.sso.redis.address=redis://127.0.0.1:6379 #配置redis将redis改成我们自己的服务器配置
框架业务使用示例和逻辑
我们准备三个顶级域名ssoserver.com
、client1.com
、client2.com
,我们将ssoserver
作为登录认证服务器,将client1.com
和client2.com
作为我们的小系统,用来模拟跨顶级域名的单点登录测试效果,小系统就是上面说的xxl-sso-samples
目录下
使用SwitchHost
更改三个域名都映射到本机,如果不使用SwitchHost
需要更改路径C:\windows\System32\drivers\etc\hosts
启动服务xxl-sso-server
,我们在启动的时候需要将源文件打包,pom.xml
中直接聚合了三个目录,我们可以一次性将整个项目都打包了,在pom.xml
所在目录打开cmd
窗口,使用命令mvn clean package -Dmaven.skip.test=true
清包打包跳过Maven测试[即使没有打过包的也要进行清包,其他项目也是如此,避免出现莫名其妙的问题],打完包直接使用java -jar
命令启动
xxxxxxxxxx
<modules>
<module>xxl-sso-core</module>
<module>xxl-sso-server</module>
<module>xxl-sso-samples</module>
</modules>
通过请求路径http://ssoserver.com:8080/xxl-sso-server
访问认证中心
分别使用两个端口启动项目xxl-sso-web-sample-springboot
,注意把配置文件给改对了,要保证redis
服务器的地址和认证中心的地址正确
分别通过请求路径http://client1.com:8081/xxl-sso-web-sample-springboot
和http://client2.com:8082/xxl-sso-web-sample-springboot
访问一个测试服务两个服务实例
验证任意一个认证中心登录,所有不同顶级域名的服务实例都登录,任意一个服务实例登出,所有不同顶级域名的服务实例都登出
方案一:Token令牌机制
最常见的场景是验证码场景,比如买票锁定座位12306需要我们输入验证码,请求中只有验证码正确的情况下才能锁定座位成功,而且验证码使用一次就失效
应用在提交订单业务上我们可以在给用户响应订单确认页的同时下发一个Token令牌,服务器存储了该令牌,用户提交订单的时候携带该令牌,只要提交订单的请求携带了该令牌,我们就验证通过创建订单,只要一次验证令牌通过服务器就删除该令牌,用户的多次提交最终只有一个请求能验证通过
如何比较完美地执行验证令牌操作才能尽可能保证不出错,主要考虑是先删令牌再创建订单还是先创建订单再删除令牌,
如果是后删令牌问题非常大,前一个请求处于创建订单期间第二个请求就开始执行令牌验证操作,由于前一个请求订单还没创建令牌还没来得及删除,第二个请求会验证通过,此时第二个请求也会开始创建订单,因此我们一般首选先删令牌
如果是先删令牌
可能存在订单没有成功创建服务器宕机导致订单创建失败的问题;
同时分布式场景下服务器的令牌不会直接存在本地缓存,一般都是存在redis中,从Redis中存取数据就会存在延迟,此时如果两个请求间隔时间很短,两个请求都成功从Redis中获取到令牌,同时验证成功,也会发生接口的幂等性问题;因此我们一般会要求验证令牌时获取令牌、验证令牌和删除令牌三个操作是一个原子性操作,我们可以使用lua脚本if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
来保证三步操作的原子性,让验证操作直接在Redis中原子性一步执行
方案二:锁机制
数据库悲观锁
select * from xxx where id=1 for update;
来使用数据库行级锁,悲观锁一般结合事务一起使用,定位记录的查询条件一定要设置成主键或者唯一键索引,否则很容易造成锁表,锁表处理起来非常麻烦
数据库乐观锁
数据库乐观锁update t_goods set count=count-1,version=version+1 where goodId=2 and version=1;
更适合更新场景,给数据库表添加一个版本字段,只有获取的数据版本和更新时的数据版本相同更新操作才会成功,更新操作会单增数据版本,只要一次数据成功更新,多次请求中后续到达的请求就无法再修改数据了[这种场景很适合库存数据的更新,比如第一次操作库存服务更新了库存数据并更新了版本号,但是操作库存数据的请求响应给订单服务时出现了网络问题,订单服务再次调用库存服务,但是订单服务最初获取的库存数据版本是旧的,库存数据的版本已经发生了更新,后续的重试不论执行多少遍都不会成功,我们可以使用额外信息来判断更新操作的具体订单,乐观锁适合读多写少的业务场景]
分布式锁
多台服务实例执行相同的写操作,我们对被操作对象上分布式锁,拿到锁的机器执行写操作,写的同时给数据一个标志位,当其他机器拿到锁以后先检查标志位发现数据已经发生了写操作就直接释放锁不再执行写操作了
方案三:唯一约束
唯一键索引:比如给用户的订单确认页下发唯一标识并保留在服务器内部,给该唯一标识对应的字段添加唯一索引,生成订单向数据库插入该唯一标识,只有第一次生成的订单数据因为唯一键索引能成功写入数据库,后续插入订单记录因为唯一键约束就会插入失败
Redis的set防重:数据只能被处理一次,我们可以在处理数据时计算数据的MD5并将MD5密文存入Redis的set中,每次处理数据前先检查一下数据的MD5是否已经存在,如果已经存在就不处理了,百度网盘的秒传功能就是这样的
方案四:防重表
上面的Redis
防重也算防重表的一种
比如建立一个去重表,使用订单号orderNo
作为去重表的唯一键索引,将订单号插入去重表再做业务处理,保证订单号插入去重表和业务处理在同一个事务中;这样因为去重表中有订单号作为唯一键索引,多次提交的后续请求就会因为无法向去重表插入数据导致请求失败,业务处理不会执行;同时即使去重表数据插入成功,只要业务处理失败因为事务也会将去重表的数据回滚,让订单的后续重试能够继续进行
因为要保证去重表和业务表的整体事务,需要将去重表和业务表放在同一个数据库中
方案五:给请求下发全局唯一标识ID
调用接口时我们可以给请求指定一个全局唯一的ID,接口处理该请求的时候将全局唯一的请求ID存储到Redis中,处理业务请求的时候如果发现该请求标识已经存在了我们就不再对该请求进行处理直接返回成功
这种请求全局唯一标识ID还可以做服务调用链路追踪,追踪请求经过了哪些服务
Nginx给每个请求分配唯一Id的配置proxy_set_header X-Request-Id $request_id;
,这个一般做链路追踪,不能做防重处理,因为Nginx给每个请求分配的全局唯一ID都是不一样的,即重复提交的每个请求Nginx都会给请求分配一个全局的唯一ID;我们自定义的请求可以参考这种思路给请求头设置一个全局唯一的请求标识ID,特别是做Feign的远程调用请求,重试的请求请求头中的唯一标识ID都是一样的
我们采用Token
令牌机制来给订单确认页返回一个唯一令牌,并将该令牌存入Redis
中,感觉存到session
中最好,这样可以天然避免第三方恶意操作用户的订单token
令牌
前端业务逻辑
我们给订单确认页返回一个Token
令牌,这里为了简单直接使用UUID
,用一个type
属性值为hidden
的input
框以token
作为输入框的值来保存该Token
令牌,这样能直接在表单提交时自动作为参数上传
给分布式session
中即Redis
中存放该Token
令牌,这里为了避免发生恶意对Token
令牌乱用,同时session
自动就有有效时间,用户不关浏览器但是一段时间不操作Token
令牌就会自动失效,用户没有确认订单关闭了浏览器Token
令牌也会直接失效,老师是使用session
中存储的用户id加token
的名字的方式作为key
存储的,value
存储token
本身
xxxxxxxxxx
//7. 设置用户提交订单的校验令牌
String orderSubmitToken = UUID.randomUUID().toString().replace("-", "");
redisTemplate.opsForValue().set(OrderConstant.ORDER_SUBMIT_TOKEN_PREFIX+":"+userBaseInfo.getId(),
orderSubmitToken,
OrderConstant.ORDER_SUBMIT_TOKEN_TTL,
TimeUnit.MINUTES);
orderStatement.setToken(orderSubmitToken);
封装提交订单时上传的数据[为了防止前端数据上传恶意篡改导致的数据错误引起问题纠纷],给提交订单按钮做一个form
表单,所有要提交的数据都做成隐藏的input
框,实际上该项目只提交了收货地址、订单最后价格、防重令牌
封装用户收货地址的ID,页面初始化或者用户切换收货地址的时候将输入框的值进行回填$("#addrIdInput").val(addrId)
封装用户的支付方式Integer
商品无需提交,再去购物车获取一遍商品[京东也是这样做的,这样可以避免确认订单请求被恶意篡改,就是我们先调出订单确认页,此时已经计算了购物车中的选中商品价格,我们再修改购物车被选中商品,此时总价肯定发生了变化,我们在原来的订单确认页点击提交订单,我们发现最新的价格变成了购物车选中商品后的价格,这说明两个情况,第一是京东提交订单以后是重新获取购物车数据来计算最终价格的,第二是京东没有对订单确认页的商品金额做验价],重新计算金额
防重令牌,做接口幂等性校验,页面模板引擎渲染时赋值
封装订单确认页总价,提交订单后重新获取购物车商品数据计算价格以后对价格进行校验,如果校验通过说明购物车中被选中的商品在订单确认期间没有发生变化,如果校验没通过说明购物车中的商品发生了变化,我们可以通知用户购物车商品发生了变化,让用户注意一下,计算应付总价时赋值
订单备注信息,这个这里没实现,有需要再实现
xxxxxxxxxx
<form method="post" action="/order/order/submit">
<input type="hidden" name="addrId" id="addr-id">
<input type="hidden" name="totalPrice" id="total-price">
<input type="hidden" name="token" th:value="${orderStatement.token}">
<button type="submit" class="tijiao">提交订单</button>
</form>
<script>
function queryFare(){
$.get("http://earlmall.com/api/ware/wareordertask/calculate/fare?addrId="+$(".addr-receive p[selectedStatus='1']").attr("addrId"),function (data){
var userAddress = data.fare.userAddress;
$(".yfze_b").text("寄送至: "+
userAddress.province+" "+
userAddress.city+" "+
userAddress.region+" "+
userAddress.detailAddress+" 收货人:"+
userAddress.name+" "+
userAddress.phone);
$("#addr-id").val(userAddress.id);
calculateFareAndPayablePrice(data.fare.fare);
})
}
function calculateFareAndPayablePrice(fare){
$("#fare").text("¥ "+fare*1+".00");
$(".hq").text("¥ "+([[${orderStatement.payablePrice}]]*1+fare*1)+".00");
$("#total-price").val([[${orderStatement.payablePrice}]]*1+fare*1);
}
</script>
后端创建订单逻辑
视图跳转逻辑
下单成功跳转支付页
下单失败携带异常信息跳回订单确认页重新确认订单
xxxxxxxxxx
/**
* @param orderParam
* @return {@link String }
* @描述 创建用户订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/30
* @since 1.0.0
*/
"/order/submit") (
public String createOrder(OrderSubmitParamVo orderParam, HttpSession session, RedirectAttributes attributes){
try{
PayVo pay = orderWebService.createOrder(orderParam);
session.setAttribute("pay",pay);
return "redirect:http://order.earlmall.com/pay.html";
}catch (RRException e){
attributes.addFlashAttribute("error",e.getMsg());
return "redirect:http://order.earlmall.com/statement.html";
}
}
"/pay.html") (
public String getViewPage(){
return "pay";
}
验证令牌,创建用户订单,校验价格,锁订单库存
xxxxxxxxxx
/**
* @param orderParam
* @return {@link PayVo }
* @描述 创建订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/30
* @since 1.0.0
*/
public PayVo createOrder(OrderSubmitParamVo orderParam) throws RRException{
UserBaseInfoVo userBaseInfo = LoginStatusInterceptor.loginUser.get();
String orderSn = IdWorker.getTimeId();
Long userId = userBaseInfo.getId();
//1. 校验订单提交令牌,校验失败抛出表单重复提交异常
if(!verifyOrderSubmitToken(userId,orderParam.getToken())){
throw new RRException(StatusCode.FORM_REPEAT_EXCEPTION.getMsg(),
StatusCode.FORM_REPEAT_EXCEPTION.getCode());
}
...
}
/**
* @param token
* @return {@link Long }
* @描述 校验并删除提交订单的令牌
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/30
* @since 1.0.0
*/
private boolean verifyOrderSubmitToken(Long userId,String token){
String key = OrderConstant.ORDER_SUBMIT_TOKEN_PREFIX + ":" + userId;
String script = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
return redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList(key), token)==1;
}
本地事务
数据库事务的特性[ACID]:
原子性[一系列操作整体性不可拆分,即要么整体成功、要么整体失败]、
一致性[整体数据操作前后守恒]、
隔离性[或独立性,事务之间相互隔离,一个业务操作失败回滚不会影响其他业务操作]、
持久性[事务一旦成功提交,数据就一定会落盘到数据库,认为是先落盘再提示事务成功提交]
本地事务的应用场景是单体应用连接一个数据库,没有多个数据库、没有涉及服务拆分、也没有涉及服务间的远程调用
Spring提供的本地事务注解@Transactional
注解
Spring框架提供了一个@Transactional
注解来使用本地事务
隔离级别:隔离级别是SQL数据库规定的一些规范
READ UNCOMMITTED[读未提交]:设置该隔离级别的事务可以读到其他未提交事务的数据,这会导致脏读现象[比如读未提交的数据被回滚了,但是其他操作已经拿到回滚前的数据进行后续的计算]
READ COMMITTED[读已提交]:设置该隔离级别的事务可以读取已经提交事务的数据,这是Oracle
和SqlServer
的隔离级别,特点是每次读取都读取的是实际值,同一系列操作中两次读取的数据不同都是实际值
REPEATABLE READ[可重复读]:设置该隔离级别的事务读取到的数据是事务开始时的数据,事务期间读取的数据都是相同的,这是MYSQL默认的隔离级别,特点是存在幻读现象,即实际数据在业务处理期间已经发生变化,但仍然使用的事务开启时的数据
mysql
的InnoDB
引擎可以通过next-key locks
即行锁算法机制来避免幻读
SERIALIZABLE[序列化]:设置该隔离级别的事务全是串行顺序执行的,MySQL
数据库的InnoDB
引擎会给读操作隐式加一把读共享锁,避免脏读、不可重复读、幻读问题,但是这也意味着使用这种隔离级别数据库操作就失去了并发能力
🔎:通过@Transactional(isolation=Isolation.READ_COMMITTED)
可以指定当前事务的隔离级别
事务传播行为[Spring中的事务管理行为]:事务的传播行为就是指被调用方法的事务和调用方法的事务之间的关系,比如a方法需要开启一个事务,b方法也需要开启一个事务,a方法调用b方法,那么a,b方法开启的事务之间的关系
PROPAGATION_REQUIRED
:如果当前业务操作还没有事务,就创建一个新事务;如果当前业务操作已经存在事务就加入该事务,一旦事务中任何一处失败事务中的所有操作都回滚
一旦b方法的事务设置成PROPAGATION_REQUIRED
,如果b方法被开启了事务的a方法调用,b方法的事务配置就会完全失效,比如b方法单独设置了代码执行超过7s就会回滚@Transactional(propagation=Propagation.REQUIRED,timeout=7)
,a方法设置了代码执行超过30s就会回滚,b方法被a方法调用,那么b方法的事务配置就会直接失效,即a事务的设置会自动传播到和a方法共用一个事务的方法
PROPAGATION_REQUIRED_NEW
:不论当前业务操作有没有事务,都为被@Transactional
注解标注的方法创建一个新的事务,该方法出现问题只会回滚该方法的业务操作,其他事务出问题不会影响该方法的事务行为
PROPAGATION_SUPPORTS
:如果当前业务操作已经存在事务就加入该事务,如果当前业务操作还没有事务就以非事务的方式执行
PROPAGATION_MANDATORY
:如果当前业务操作已经存在事务就加入该事务,如果当前业务操作还没有事务就直接抛出异常
PROPAGATION_NOT_SUPPORTED
:以非事务的方式执行当前被标注方法,如果当前业务操作已经存在事务,就将当前事务挂起
PROPAGATION_NEVER
:以非事务的方式执行当前被标注方法,如果当前业务操作已经存在事务,就直接抛出异常
PROPAGATION_NESTED
:如果当前业务操作已经存在事务,被标注方法会在当前事务内部创建一个子事务,子事务的特点是会在子事务开启的时刻创建一个保存点,子事务失败只会回退到该保存点,不会回滚整个事务,父事务中的其他操作仍然能正常执行;但是如果父事务回滚,子事务也会一起回滚;如果当前业务操作还没有事务就为被标注方法创建一个全新的事务
这个事务传播行为和PROPAGATION_REQUIRED_NEW
很像,被标注方法的事务失败不会影响业务操作的其他事务,主要区别如下;PROPAGATION_REQUIRES_NEW
会挂起当前事务,并创建一个全新的事务,这意味着新事务与原事务完全独立,事务成功或者失败不会相互影响。而PROPAGATION_NESTED
则是在当前事务内部创建一个子事务,如果父事务被回滚,子事务也会被回滚,但子事务的回滚不会影响到父事务。
SpringBoot
中的本地事务
SpringBoot
中也是默认使用Spring
的本地事务注解@Transactional
SpringBoot
中使用@Transactional
注解的坑
❓:在SpringBoot
中,如果a方法、b方法、c方法是同一个Service
中的方法,三个方法都标注了@Transactional
注解且都做了个性化配置,比如都各自配置了不同的事务传播行为和超时时间,此时如果a方法调用了b方法和c方法,b方法和c方法的任何事物配置都不会生效,包括事物传播行为,都是和a方法共用同一个事务
🔑:这是AOP
的问题,在之前SpringCache
中就出现过,那儿老师没有解释,事务注解是通过AOP实现的,事务是通过代理对象orderService
来控制的,如果直接调用同一个类中的实例方法本质上相当于跳过代理对象直接通过方法名调用同一个类中的方法,就类似于代码的复制粘贴,被调用的方法事务注解不会生效;根本原因就是绕过了代理对象,这一块不太熟,后面复习的时候深入理解一下,a方法是通过代理对象调用的,但是b方法和c方法只是单纯地将代码复制粘贴过来,我们通过orderService
来调用被标注了基于AOP实现的事务或者缓存注解的方法对应的注解会生效,但是同一个类下的代码的相互调用是类似于直接复制拷贝代码的形式,通过this
来调用也是没用的,this
也会被处理成同一个对象从而直接复制拷贝被调用方法的代码,只有通过代理对象类似于orderService.a()
来调用事务注解或者缓存注解才会生效,总之基于AOP实现的注解都需要通过代理对象来调用才会生效,通过this
调用也是不生效的;而且一定不要企图在orderService
中自动注入orderService
来通过代理对象调用b方法或者c方法,这相当于orderService
依赖于orderService
,在构造orderService
将其注入容器时会发现构造orderService
自身有一个属性需要注入orderService
,会造成循环依赖的问题,系统启动的时候就要爆炸
🔑:同一个对象内基于AOP实现的注解标注方法相互调用注解功能失效的解决办法,核心是要使用代理对象来调用标注了基于AOP实现的注解的方法
引入Spring的AOP动态代理场景启动器spring-boot-starter-aop
,引入该场景启动器的目的是使用其依赖的aspectjweaver
,这个动态代理更加强大
在配置类上使用注解@EnableAspectJAutoProxy
开启Aspectj
动态代理,不使用该注解默认使用的是JDK默认的按照接口自动生成的动态代理,使用该注解所有的动态代理都是Aspectj
创建的,使用Aspectj
的好处是即使没有接口也可以创建动态代理;在@EnableAspectJAutoProxy
中指定exposeProxy
属性为true
即@EnableAspectJAutoProxy(exposeProxy=true)
来对外暴露代理对象
只要设置了通过Aspectj
创建代理对象,我们就可以在任何地方通过org.springframework.aop.framework.AopContext
即AOP上下文的AopContext.currentProxy()
拿到当前代码所在对象对应的代理对象[Object类型,需要强转为当前代码所在对象的类型来调用所在对象的b方法或者c方法],直接通过代理对象来调用b方法或者c方法,这样b方法和c方法的事务注解包括此前的缓存注解才会生效,示例方法如下所示:
xxxxxxxxxx
timeout=30) (
public void a(){
OrderServiceImpl orderService = (OrderServiceImpl)AopContext.currentProxy();
orderService.b();
orderService.c();
}
propagation=Propagation.REQUIRED,timeout=2) (
public void b(){
}
propagation=Propagation.REQUIRES_NEW,timeout=20) (
public void b(){
}
分布式环境下本地事务存在问题
虽然我们给创建订单和锁定库存都分别添加了@Transactional
本地事务注解来各自开启单体事务,但是因为订单服务和库存服务处在不同的服务实例,因此事务不能跨服务生效,订单成功创建但是库存没有成功扣减,只会回滚库存成功锁定商品的记录,无法回滚已经创建订单记录,这是单体事务的局限
解决办法,我们直接根据远程调用的结果判断,在创建订单记录的服务中判断锁库存的状态,如果锁库存失败,我们直接在订单服务抛异常来让订单服务的事务进行回滚,这样也能控住分布式事务,让多个数据库一起回滚
但是通过抛出异常和单体事务结合的方案不能完美解决事务问题
被调用服务成功执行,调用服务可能由于网络中断、调用超时导致被调用服务成功执行,调用服务回滚[这种情况叫远程服务假失败]。因为被调用服务本身可能不会出现异常正常执行,但是由于网络中断,系统卡死导致响应超时等都会导致在调用方抛出异常,这样就会导致远程调用成功添加记录,但是调用方由于网络,远程调用超时而抛出异常来回滚,导致订单相关数据没有一起回滚;比如锁定库存服务响应慢,库存锁定成功了,但是订单服务远程调用超时,订单服务感知到远程调用出问题了抛异常事务回滚,但是远程服务正常执行不会进行回滚,就会出现订单取消了,但是库存给锁定了
已经被调用并成功执行的远程服务在订单创建失败的情况下无法自动进行回滚。远程服务调用期间没有出现问题,方法执行结束事务就已经结束了,后续订单服务运行期间出现任何问题导致需要回滚,已经执行完毕的远程服务无法自动进行事务回滚,我们想要手动回滚还需要专门写释放对应库存记录的方法
本地事务在分布式系统下只能控制住本地连接的事务回滚,控制不了其他服务和连接的事务回滚,在分布式系统下,本地事务控制不住事务的根本原因是网络中断+不同数据库+服务实例集群,而本地事务只能控制一个连接内的事务
CAP定理:一个分布式系统中,以下三个要素最多只能同时实现两个,不可能三者兼顾
一致性[Consistency]:同一时刻对分布式系统中的任意一个节点的某个数据进行访问,一定是获取的最新的相同的值,形象地说就是任意一个数据节点更新某个数据完成时间点以后,访问任意一个数据节点都获取的是更新后的数据或者数据副本,注意这个一致性说的是强一致性,即任何时间点访问任何机器上的数据都是确定相同的值
从客户端角度,多进程并发访问时,更新过的数据在不同进程中的获取策略决定了不同的一致性,对于关系型数据库不同的一致性要求如下
强一致性:更新成功的数据能够被后续所有请求唯一访问
弱一致性:更新成功的数据,系统能够容忍后续部分请求或者全部请求都访问不到
最终一致性:更新成功的数据,经过一段时间弱一致性后所有的请求都能唯一访问,即我们能忍受一段时间的弱一致性
分布式事务就是围绕我们想要系统维持什么样的一致性来设计的不同的几种方案
可用性[Availability]:集群中部分节点故障后,集群仍然能响应客户端的读写请求;不可用就是系统某个环节出现了需要等待节点修复以后才能使用的情况
分区容错性[Partition tolerance]:不同节点上的服务相互通信需要通过网络,只要网络通信出现了故障就可以认为发生了分区错误,专业的角度来说,一个分布式系统分布在多个子网络上,每个子网络就是一个区;分区容错的意思是区之间的通信可能会失败,比如中国的网络是一个区,美国的网络是一个区,两个区的网络可能无法通信
举一个例子说明CAP理论:
假如一个MySQL主从集群,A节点是主节点,B节点和C节点是从节点,一旦A节点和C节点之间发生网络分区故障,主节点的数据更新就无法同步到从节点上,如果此时还要保证C节点的可用性,就会发生从C节点上读取到的数据都是错误数据的情况,从而导致系统一致性得不到保证;
在分布式系统中我们永远要满足分区容错性,因为网络通信肯定会出现问题,网络出现问题我们还要保证系统运行就是满足分区容错,因此我们就只能选择满足一致性或者选择满足可用性,当发生分区故障,导致不同数据不一致,我们需要根据业务场景选择是牺牲一致性满足可用性[允许部分数据不一致]还是牺牲可用性满足一致性[不允许部分数据不一致而让数据不一致的服务不可用或者直接让系统不可用来代替单个节点的不可用]
因为分区容错无法避免,可以认为分布式系统下CAP理论的P总是成立[因为无法保证网络不中断,除非单体应用且数据库Redis
等都装在一台机器中],CAP理论指出在保证分区容错的情况下,一致性和可用性无法同时做到,只可能同时满足CP或者AP
满足AP即满足分区容错的前提下满足可用性,即让三个节点都正常运行,取到数据不一致无所谓,业务正常执行
满足CP即满足分区容错的前提下满足一致性,即让系统不可用或者让数据错误的节点不可用
分布式系统下实现一致性的Raft
算法、paxos
算法
CAP定理在实际开发中面临的普遍问题是
大型互联网应用场景,主机众多,部署分散,集群规模越来越大;节点故障和网络故障是常态,对商用服务还要充分保障可用性,系统不可用特别像阿里这种基础服务设施短时间的不可用就是特大事故,因此我们还需要保证系统的可用性达到N个9,在很多时候都要保证分区容错和可用性,舍弃掉强一致性;
此时就从CAP理论延伸出BASE理论,核心思想即使我们在保证分区容错和可用性的前提下无法做到强一致性,但是我们可以适当地采取弱一致性,弱一致性就是最终一致性
Base理论
基本可用[Basically Avaliable]:基本可用是指分布式系统在出现故障时,允许损失诸如响应时间、部分功能这样的部分可用性,来保证整个系统的可用性
响应时间损失:正常情况下系统0.5s内响应客户查询请求,在系统部分机房断点或断网的情况下,查询响应时间可以增加到1-2s
功能上损失:电商网站在购物高峰期为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面[服务降级页面,比如一个错误页,Sentinel学过;这个思路学Nginx的时候听说过,秒杀抢购活动实际上早就决定好了哪些请求失败,因为一般这种流量都会提前打开对应活动界面,在活动界面埋点就能预测具体请求数量甚至直接在前端就决定哪些请求直接失败,到时刻以后直接跳转错误页,只有很少的流量到达了上游服务器]
软状态[Soft State]:软状态是相对于强一致状态而言的,强一直是业务操作中的每个操作要么整体成功、要么整体失败,软状态是整个业务操作正在同步中;
典型应用场景就是分布式存储中一份数据一般会有多个副本,允许不同副本的延时同步就是软状态的体现。Mysql
的异步复制即mysql replication
就是软状态的一种体现
最终一致性[Eventual Consistency]:最终一致性是指系统中所有的数据副本经过一定时间后最终能够达到一致的状态[注意核心概念是能够,像不同节点获取到不同的数据并使用该数据继续一泻千里最终得到完全混沌的数据是不能经过一段时间达到一致状态的]
比如在我们的订单服务中,我们使用单体事务控制分布式系统的远程调用事务,库存服务成功扣减库存,但是订单回滚,库存服务无法回滚,我们可以不要求强一致性,即订单创建失败就必须让库存也回滚,我们可以在订单出现异常回滚前的时刻给消息中间件发送一条消息,指定要解锁的库存数量,一段时间后库存服务接收到消息释放解锁对应库存即可
我们可以使用各种手段来保证最终一致性,事缓则圆嘛
Raft
算法
Raft
是一种协议,可以保证分布式系统下的一致性,Raft
的原理演示流程查看http://thesecretlivesofdata.com/raft/
配合老师的课程会理解的更清晰,作用是即使网络出现了分区故障,系统的一致性仍然能得到保障
分布式系统一致性:当数据库只有一个节点时一致性非常容易满足,只需要一个节点更新数据成功后读取到的数据就是最新的数据;关键是在数据库集群场景下如何保证数据库集群整体的数据一致性
主节点选举:Raft
算法把系统中的每个节点分成三种状态,follower从节点、candidate候选者、leader主节点;数据库集群启动时每个节点都是以从节点的状态启动的,如果从节点在一定时间内没有监听到主节点发送来的心跳包就会主动变成候选者状态[变成候选者是为了准备当主节点];候选者会给集群中的每个节点都发起一个投票请求,集群中每个收到请求的从节点都会投票给该候选者,而且从节点只要投过一次票在一次心跳时间内就不会再给后续的投票请求投票;候选者只要收到包含自己在内的大多数节点的同意投票就会变成主节点状态,这就是主节点的选举过程;
Raft中有两个超时时间用来控制主节点选举过程,一个是选举超时时间[超过指定时间没有收到主节点的心跳包当前节点就会成为候选节点,让其他从节点选举候选节点做主节点,选举超时时间一般是150-300ms,也称节点的自旋时间,节点的自旋时间是随机的],一个是心跳超时时间[主节点向从节点发送心跳包的间隔时间,该心跳超时时间一般远小于每个节点的自选时间,只要从节点收到主节点的心跳包就会重置各自的自旋时间来避免成为候选节点],节点内部有一个Term属性记录当前节点发起投票的轮数,候选者也会给自己投一票并且将投票轮数递增1[注意投票轮数从集群启动开始设置为0,只要发起一次投票就会累加1,不会出现重置的情况],其他从节点如果还没有投票就会将票投给该候选者并重置自身的自旋时间,后续的投票请求不会同意投票,只要主节点收到绝大多数从节点的同意投票就会变成主节点,主节点开始给所有从节点通过心跳包发送追加日志,并周期性地向所有从节点发送心跳包,一直维持该状态直到主节点宕机[准确地说是有一个从节点变成了候选者节点,就是有一个节点超过自旋时间没有接收到心跳包],此时又开始重复主节点选举流程,此时新的主节点的数据和旧的主节点数据是一致的
节点宕机必然导致集群内的节点数量变成偶数,此时就可能存在两个节点成为候选节点并只都获取到半数投票[实际上可能存在N个候选节点,且N个候选节点都没有获取到半数以上的投票,概率小,因为自旋时间比较长,远超内网通信时间],此时因为没有候选者节点获取到半数以上投票将无法选举出主节点,注意节点在自旋时间结束后新的自旋周期开启时发起投票请求并重新开始自旋时间计时,各个节点会因为自旋时间内没有成为主节点和没有收到主节点的心跳包而在自旋时间结束后再次发起投票,重复上述过程直到某个节点成功获取到大多数节点的投票,因为必须要获取到大多数节点的投票才能成为主节点,因此系统内只可能存在一个主节点;只可能在网络分区故障多个分区之间无法通信,此时才会存在集群分裂和多个主节点的现象
日志复制:一旦主节点选出来以后,所有对该集群系统的更新都会通过给主节点发送请求来实现,从节点自身不具备更新功能;主节点每一个数据写操作都会被追加为一个节点日志,生成日志时主节点的更新操作还没有提交,此时客户端访问主节点的数据仍然获取的是更新前的数据;主节点生成节点日志后复制节点日志给所有的从节点,从节点收到日志并更新了本地数据后会响应更新成功状态给主节点;主节点只要确认大多数节点更新成功就会直接提交本次更新并再次向所有从节点发起提交请求
每个日志都是通过主节点的心跳包发送出去的,主节点发生写操作追加更新日志,会统一在下一个心跳包中奖日志携带给各个从节点,从节点同步日志并回复主节点,只要大多数节点回复主节点成功同步主节点就会直接提交更新结果并同时将更新成功的状态响应给客户端,在下一个心跳包中向所有从节点发起提交请求,所有从节点提交更新结果并将提交状态响应给主节点
网络分区故障Raft保障数据一致性的原理
假设A、B两个节点处于一个网络分区,C、D、E三个节点处于一个网络分区,两个网络分区发生了网络分区故障,两个分区之间节点的网络通信断掉了;假如原来5个节点的主节点是B节点,一旦出现网络分区,B主节点和另一个网络分区的从节点无法通信,但是因为B主节点所在网络分区和从节点可以正常通信,因此B主节点的状态不会发生变化;C、D、E三个从节点因为收不到主节点的心跳包重新选举主节点,假如选举出主节点E,此时所有节点的数量就变成了3,特别注意此时旧的主节点B因为仍然维持主节点状态,所有节点的数量仍然维持5,只有在选举成为主节点时才会更新集群中的节点数量;
此时相当于两个分区的节点分裂成两个集群,主节点所在集群仍然维持旧的主节点,另一个全是从节点的分区重新选举主节点,旧的主节点可能因为所在分区节点数小于成为主节点时节点数的一半,从而导致旧的主节点所在分区永远无法正确响应客户端的请求,但是另一个分区因为重新选举了主节点重新确定了集群节点数量因此会正常处理所有客户端请求并在一个分区中维持数据一致性
当网络分区故障恢复以后,此时会出现两个主节点,两个主节点都会互相给对方发送心跳包,当主节点发现对方的选举轮数Term的值比自己大,就会皇帝退位,即旧的主节点B发现对方的选举轮数比自己多就会变成从节点,原来B主节点所在分区节点收到两个心跳包并发现有轮数更大的主节点就会将轮数更大的主节点视为新的主节点,B主节点所在分区的节点会将没有提交的数据全部回滚并重新同步新的主节点E的更新日志,此时网络故障恢复,整个集群又处于一致性状态了
这个演示只适合旧的主节点所在网络分区的节点数小于绝大多数节点的情况,此时旧的主节点无法提交更新数据因而整个分区无法处理用户请求从而保障系统数据一致性;但是如果旧的主节点所在网络分区的节点数大于绝大多数节点要求那岂不是也可以提交更新的数据,那岂不是会变成两个集群[因为这种思路逻辑有问题,对上面的情形换一种思路理解,即集群总的节点数量在集群启动时就确定,网络分区一般发生在两个分区之间,两个分区会将可通信的节点数量分成两份,一方节点数量大于一半另一方的节点数量就必然小于一半,即始终只有一个一个集群能成功提供服务,另一个集群因为节点数小于一半而无法提交更新结果从而无法提供服务;但是这也存在问题,因为两个网络分区的存活节点数也可能刚好相同,那这种情况岂不是两个集群都无法对外提供服务系统直接崩溃,这里需要看一下相关论文确认]
raft.github.io
也有一个动画演示,但是都没有展示两个网络分区节点数各占一半情况下由于不满足半数以上节点同步更新导致无法各分区主节点无法提交导致整个系统无法对外提供服务的情况,看论文可能会有收获
这个问题老师后面提了,说这正是CAP定理对Raft算法的约束,即要保证集群的一致性,当两个集群的节点数目无法达到半数以上,虽然基本上节点都存活着,但是此时集群因为主节点无法获取大多数节点的响应而无法提交更新后的数据因此整个集群都无法为客户端提供服务,保证一致性并降低对可用性的影响
业务太大,系统性能、安全和可伸缩性要求太高,我们需要将系统拆分为分布式集群,分布式系统经常会出现机器宕机、网络异常、消息丢失、消息乱序、数据错误、不可靠的TCP、存储数据丢失;
在本项目中,我们拆分出来的服务还单独操作自己的数据库,每个服务还会部署在不同的服务器上,服务之间的远程调用调用方只有获取到被调用方的响应结果以后才能得知远程调用的执行结果,这就可能导致远程被调用服务执行完但是响应的时候出问题,比如执行完响应前的一刻被调用服务的服务器宕机,数据永远无法成功响应,但是实际远程调用已经成功;又比如消息刚发出网络就中断了;又比如系统引入了很多中间件比如消息中间件也可能引入比如消息丢失、类型无法进行转换等等额外的问题;此时调用方不仅不知道远程服务器是否执行成功,甚至要查询远程服务器对应的落盘数据都很困难,因此只要有一个被调用服务出现问题,要在整个业务操作同步状态非常困难
只要是微服务架构系统,分布式事务是每一个分布式系统架构无法避开的东西
分布式事务要实现的一致性是保证多个业务操作的一致性,一个操作失败所有操作失败,所有操作成功整个操作才算成功,而且一个数据库集群中的所有节点都要保持一致性状态
分布式事务常见的解决方案包括2PC模式
2PC模式[2 phase commit,也称二阶段提交]
2PC模式也叫XA Transactions,MySQL从5.5版本开始支持、SQL Server从SQL Server 2005开始支持、Oracle从Oracle 7开始支持
二阶段提交协议的思想是将事务拆分成两个阶段,其中涉及两个对象事务管理器和本地资源管理器[本地资源管理器可以视为每个服务的事务管理器],这里把本地资源管理器看成两个功能不同服务的事务管理器
二阶段提交是将事务分成两个阶段,第一个阶段是准备提交阶段,事务管理器对业务相关的每一个微服务的事务管理器发起请求要求各事务管理器检查本地的事务提交就绪状态[本地数据是否准备好、数据库连接是否正常以及能否正常提交数据],如果各个服务都能正常提交每个服务都会响应就绪状态给事务管理器
第二个阶段是提交阶段,如果事务管理器都成功接收到业务操作涉及到的本地资源管理器的就绪状态确认,就会给每个本地资源管理器发起提交请求,所有本地资源管理器统一提交事务并响应成功状态给事务管理器;一旦有任何一个数据库在预备阶段否决此次提交、所有相关数据库都会被要求回滚本次事务中的那部分数据,事务管理器就会要求所有的本地资源管理器全部进行回滚
XA协议最大的特点是简单,而且基本上商用的数据库都实现了XA协议,使用分布式事务的成本很低;但是注意mysql
数据库对XA协议的支持不是很好,mysql
的XA实现没有记录预备阶段[prepare]的日志,主备数据库切换会导致主库和备库的数据不一致
XA协议最大的问题是性能不理想,特别是交易下单这种并发量很高的服务调用链路,XA协议无法满足高并发场景;很多NoSQL数据库也没有支持XA协议,这就使得XA协议的应用场景变得非常狭隘
除了二阶段提交还有三阶段提交,额外引入了超时机制,无论无论是事务管理器还是本地资源管理器,向对方发起请求后,超过一定时间没有收到回应会执行兜底处理[这里老师说的三阶段提交是将预备阶段分成两个阶段,第一个阶段询问各个本地资源管理器能否正常提交,第二个阶段是本地资源管理器准备数据,第三个阶段就是正常的提交阶段]
XA协议在应用中能解决一些问题,但是应用的不多,主要是了解
柔性事务[TCC模式,也称TCC事务补偿型方案]
这个方案在分布式事务中经常使用,柔性事务是一类保证最终一致性的方案的统称
刚性事务:遵循数据库ACID原则的强一致性要求的事务
柔性事务:遵循BASE理论的最终一致性要求的事务,柔性事务允许一定时间内不同节点的数据不一致,但是最终各个节点内的数据一致
柔性事务流程图
假如有两个数据库,一个数据库对应订单服务,另一个数据库对应库存服务,每个数据库都由对应的服务来进行操作
TCC模式要求开发人员在编写代码的时候在服务中实现三个方法Try
[该方法是准备要提交的数据]、Confirm
[该方法是用于提交数据]、Cancel
[该方法是回滚准备提交的数据,老师这里说的是开发人员前面提交了数据比如数据加2,这个取消方法就要将数据减2]作为可能被回调的方法
第一个阶段,主业务服务[调用各个远程服务的大业务所在服务]命令各个服务调用开发者编写的Try方法来准备数据,同时启动业务活动管理器记录业务操作、并通过业务活动管理器控制提交和回滚业务活动
第二阶段,业务活动管理器命令各个服务调用Confirm方法提交数据
第三个阶段,只要提交过程有任何一个远程调用服务或者主业务服务执行失败,业务活动管理器就会命令所有的远程服务触发开发者自己编写的Cancel方法来做手动回滚补偿,已经成功提交的数据我们再手动进行恢复
这种模式在电商项目中使用的非常多,基于TCC模式实现的事务框架也非常多,只需要按照框架的接口规范把业务方法拆分成三个部分,分别实现数据准备、提交数据、回滚数据的三个方法,框架会自动在特定的节点对三种方法进行调用,这个方案的核心就是出现问题对已经成功提交的数据采用手动补偿的方式来实现回滚
TCC模式相当于3PC模式的手动版,3PC相当于自动准备提交的数据、自动进行提交和自动进行回滚,TCC相当于程序员自己实现准备提交数据、提交、回滚的逻辑
柔性事务[最大努力通知型方案]
主业务比如创建订单业务远程调用库存服务锁库存成功,远程调用订单服务保存订单数据成功,但是比价的时候失败了,此时订单创建失败,注意此时订单数据和库存数据都已经提交;我们可以让主业务给消息队列中的主题交换器发送消息给队列,让所有相关服务都来订阅消息队列,库存服务收到消息去解锁库存,订单服务收到消息去解锁订单
我们害怕消息发出去了但是消息丢失,我们可以逐渐拉长时间间隔给消息队列中发送消息,设置最大消息通知次数,达到最大通知次数就不再发送订单创建失败消息通知;或者服务手动回滚即释放库存即删除订单数据成功了就将消息响应给主业务服务,此时主业务服务收到回滚确认以后就不再向消息队列发送消息
这种多次通知、主业务确认手动回滚的特点也是最大努力通知型方案命名的原因
这种方式适合使用在与第三方系统通讯的场景,比如调用微信或者支付宝支付后的结果通知,各大交易平台间的商户通知、多次通知,查询校对、对账文件、支付宝支付成功后的异步回调等;支付宝付款就是支付成功以后会多次给我们的服务器发送支付成功的消息给我们的订单业务
通过消息队列来实现延时回滚的策略都是通知型方案,保证最终一致性来提升系统的可用性,实际生产中也常使用第三和第四种结合消息队列多次失败通知回滚数据并回复消息生产者的方案
柔性事务[可靠消息+最终一致性方案,也称异步确保型]
这个讲的不清楚,说后面会补充案例说明
大体意思是业务事务提交前,会实时给消息服务保存一份消息数据,但是消息数据在得到确认发送的指令前不会发送给远程调用服务,只有在业务事务提交后才会消息服务发出确认发送指令,这里讲的不清楚,后面结合场景理解一下
老师在这里的解释和上面第三种是一样的,大业务失败给消息队列发送消息,被调用服务收到消息就回滚数据
方案优缺点分析
第三和第四种方案的好处是可以支持大并发场景,订单服务失败只需要发送消息给消息中间件,无需等待其他服务数据回滚就能直接响应用户请求,通过多次发送和回滚验证确认接口来等待远程服务的回滚状态确认,一旦得到确认就停止消息的发送来实现最大努力通知
Seata
分布式事务框架实际是2PC协议[二阶段提交协议]的一个变形,实际上Seata
为用户提供了多种AT[Auto Transaction自动事务模式]、TCC[手动事务补偿模式]、SAGA、XA[两阶段提交]事务模式,这里只演示了AT模式的用法,其他模式的用法可以通过https://github.com/apache/incubator-seata-samples
对应的文件目录下找到对应模式的使用示例和使用步骤说明
Seata
默认的AT
模式是二阶段提交协议的一个演变,和原来的二阶段提交XA模式的区别是,XA模式第一个阶段是准备数据阶段,只会在第二个阶段再提交事务;但是AT模式是第一个阶段业务数据和回滚日志记录就在同一个本地事务中提交并释放本地锁和连接资源,而且各个分支事务提交是异步化的,第二个阶段是回滚的时候通过一阶段提交的回滚日志进行反向补偿
注意导入了Seata
的依赖必须将Seata
服务器的registry.conf
和file.conf
两个文件拷贝到项目的类路径下,否则项目启动会直接报错,因此没有使用Seata
的服务最好不要导入或者排除该依赖
Seata
分布式事务控制原理
术语
TC:事务协调者,作用是维护全局和分支事务状态,驱动全局事务提交或者回滚,通过TC协调各个远程服务是否一起提交事务或者回滚事务,这个TC就类似于XA二阶段协议的事务管理器,主要是作为全局的协调者
TM:事务管理器,定义全局事务范围,开始全局事务、提交或回滚全局事务
RM:资源管理器,资源管理器位于各个服务中,直接和当前服务对应数据库交互,就是类似于单体Spring中使用的@Transactional
三者的整体关系是事务管理器负责开启全局事务,TC事务协调者负责协调全局事务中牵扯的各个分支事务,
工作流程
创建订单业务的事务管理器准备开启一个全局事务,向事务协调者声明开启一个全局事务,事务协调者响应收到;
订单业务所在服务调用远程服务的时候,远程服务的RM资源管理器会向事务协调者声明一个分支事务并且需要实时报告分支事务状态,无论任意一个分支事务提交还是回滚,事务协调者都会实时知道;注意分支事务是在订单业务调用远程服务执行业务代码时开启的,而且调用结束分支事务就已经提交了
远程调用时任意一个分支事务回滚,事务协调器会直接命令其他已经提交过的分支事务也回滚
使用注解@GlobalTransactional
标注在业务方法上即可使用seata
分布式事务
Seata
的使用方法
1️⃣:如果我们使用Seata
的AT自动事务模式需要创建一张数据库表UNDO_LOG
表[回滚日志表],
因为自动事务模式下数据的回滚由Seata
进行控制,每个分支事务在远程调用结束前分支事务就已经提交了,回滚只能通过反向补偿的方式重置数据;使用TCC模式是自己来定义反向补偿的代码,TA自动事务模式就是该代码由Seata
实现和调用,Seata
需要在每一个数据库中都要额外准备一个回滚日志表,回滚日志表记录着此前给哪个数据库表的哪个记录做了什么更新,恢复以前的状态就是对以前的更新操作进行反向补偿,这种方式叫魔改数据库
[回滚日志表]
给每个数据库都要创建下面这张回滚日志表
xxxxxxxxxx
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2️⃣:从地址https://github.com/seata/seata/releases
下载Seata
服务器软件包,Seata
服务器就是负责全局协调的事务协调者TC
老师下载的是windows的seata-server-0.7.1.zip
,老师说1.0.0
版本的用法和0.x.x
版本的用法不一样
3️⃣:在pom.xml
中导入依赖com.alibaba.cloud:spring-cloud-starter-alibaba-seata
在IDEA项目的External Libraries
中存放着我们引入的第三方依赖,其中的com.alibaba.cloud:spring-cloud-starter-alibaba-seata
即seata
的源码中的GlobalTransactionAutoConfiguration
是seata
全局事务配置
com.alibaba.cloud:spring-cloud-starter-alibaba-seata
依赖于io.seata:seata-all
,这个就是seata
的TC事务协调器对应的依赖,这个依赖的版本必须和seata
的TC服务器[就是seata-server
]版本保持一致,com.alibaba.cloud:spring-cloud-starter-alibaba-seata:2.1.0.RELEASE
对应io.seata:seata-all:0.7.1
xxxxxxxxxx
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
4️⃣:在windows本机解压seata-server-0.7.1.zip
得到seata-server-0.7.1
,目录结构如下
bin
:seata-server
的命令行目录
seata-server.bat
:双击启动windows
上的事务协调器,seata-server
在注册中心上的服务名是serverAddr
[新版本或者docker
中叫seata-server
],注意seata-server
对JDK
的版本有要求,JDK
版本不对CMD启动会直接报错
conf
:seata-server
配置目录,用户能自定义配置的两个文件是file.conf
和registry.conf
db_store.sql
:使用seata
相关功能涉及到的所有数据库表sql
registry.conf
:注册中心相关配置[有这个配置文件是因为seata
服务器也想把自身注册到注册中心里面作为系统的一部分,registry
是配置注册中心信息的,config
是对seata
服务器进行配置]
registry
支持的注册中心类型被列举在type
字段上方,包含file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
默认是file
,我们使用的是nacos
,需要修改为nacos
serverAddr
:指定nacos
的注册中心服务器地址
config
支持的配置方式都被列举在type
字段上方,包含file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
,file
表示使用本地的配置根目录下的file.conf
做seata
相关配置,nacos
表示使用配置中心上的配置文件来做配置,其他的依次代表使用对应配置中心上的配置文件来做seata
服务器的配置,我们使用file.conf
直接在seata
服务器本地的file.conf
做seata
服务器的相关配置
xxxxxxxxxx
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa指定注册中心
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3指定配置中心
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
transport
是seata
的数据传输配置,type="TCP"
表示使用TCP
传输协议,server="NIO"
服务器采用NIO的数据传输模式,heartbeat=true
表示开启服务器心跳,thread-factory
是线程工厂配置
service
是
client
是seata
客户端配置
store
是事务日志存储配置,mode="file"
表示使用的配置方式,默认支持file
[事务日志文件存储在seata
服务器,store
中的file
标签配置事务日志文件存储的目录dir
和日志文件大小]和db
[事务日志文件存储在数据库中,store
中的db
标签配置事务日志文件存储的数据库地址url
,用户名user
和密码password
,全局事务日志表名global.table
、分支事务表名branch.table
和锁表名lock-table
,这三张表就是db_store.sql
中的三张表],使用数据库存储需要去数据库创建对应三张表,我们这里图方便直接在seata
服务器本地存储事务日志文件,什么都不需要管很方便
xxxxxxxxxx
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
lock {
## the lock store mode: local、remote
mode = "remote"
local {
## store locks in user's database
}
remote {
## store locks in the seata's server
}
}
recovery {
committing-retry-delay = 30
asyn-committing-retry-delay = 30
rollbacking-retry-delay = 30
timeout-retry-delay = 30
}
transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
}
## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}
lib
:
5️⃣:给需要使用分布式事务的业务方法标注全局事务注解@GlobalTransactional
[注意只有调用远程方法的业务方法才需要标注全局事务注解,只是作为被调用方法只需要标注本地事务即可],注意每个服务内部的方法本地控制事务仍然要标注本地事务注解@Transactional
老师说的只要给分布式大事务标注全局事务注解@GlobalTransactional
注解就行,每个远程小事务都使用本地事务注解@Transactional
,但是我这里有疑问,如果远程调用方法调用了远程服务,那么这个远程服务是否还需要标注全局事务注解@GlobalTransactional
@GlobalTransactional
注解中配置了事务超时时间timeoutMills
,要回滚的异常rollbackFor
,无需回滚的异常noRollbackFor
其他步骤按照文档地址https://seata.apache.org/zh-cn/docs/user/quickstart/
补足剩余操作,
在https://github.com/apache/incubator-seata-samples/tree/master/at-sample
中有seata
与各种场景下比如dubbo、MyBatis、springcloud-jpa-seata
的整合示例,在每个服务实例中的README.md
中是对应应用场景的使用方法介绍,以springcloud-jpa-seata
即springcloud-jpa
应用场景整合seata
为例
除去介绍的在快速开始已经完成的准备工作,还需要额外注入一个DataSourceProxy
容器组件,该组件是seata
的代理数据源,seata
想要控制住事务需要通过seata
包装默认的数据源让seata
来代理数据源才能实现使用seata
控制事务的目的
6️⃣:所有使用seata
分布式事务的微服务都需要使用seata
的DataSourceProxy
代理默认的数据源
具体实现是手动给数据库注入一个默认要使用的数据源,然后通过该数据源组件再创建注入一个数据源代理对象DataSourceProxy
组件,并使用@Primary
注解将数据源代理对象作为主数据源,注意下面这个配置在SpringBoot
低版本可用,但是在SpringBoot2.0
以后容易引起循环引入异常
注意:在Seata0.9
版本以后,提供了DataSource
默认代理的功能,并且默认是开启的,不用再手动的去把DataSource
放入到DataSourceProxy
中了
xxxxxxxxxx
public class DataSourceConfig {
prefix = "spring.datasource") (
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
*
* @param druidDataSource The DruidDataSource
* @return The default datasource
*/
"dataSource") (
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
SpringBoot
默认数据源配置DataSourceAutoConfiguration
xxxxxxxxxx
/**
* {@link EnableAutoConfiguration Auto-configuration} for {@link DataSource}.
*
* @author Dave Syer
* @author Phillip Webb
* @author Stephane Nicoll
* @author Kazuki Shimizu
* @since 1.0.0
*/
DataSource.class, EmbeddedDatabaseType.class })//@ConditionalOnClass注解表示只要系统内有数据源DataSource的子实现类就会默认开启数据源的自动配置 ({
DataSourceProperties.class)//@EnableConfigurationProperties表示开启属性配置类DataSourceProperties的属性绑定功能,并将该配置类注入到容器 (
DataSourcePoolMetadataProvidersConfiguration.class, DataSourceInitializationConfiguration.class }) ({
public class DataSourceAutoConfiguration {
(EmbeddedDatabaseCondition.class)
({ DataSource.class, XADataSource.class })
(EmbeddedDataSourceConfiguration.class)
protected static class EmbeddedDatabaseConfiguration {
}
//@Import是导入配置文件,SpringBoot默认就是使用的Hikari数据源,SpringBoot默认就是通过这里的DataSourceConfiguration.Hikari.class导入的数据源配置
(PooledDataSourceCondition.class)
({ DataSource.class, XADataSource.class })
({ DataSourceConfiguration.Hikari.class, DataSourceConfiguration.Tomcat.class,
DataSourceConfiguration.Dbcp2.class, DataSourceConfiguration.Generic.class,
DataSourceJmxConfiguration.class })1️⃣ //默认的数据源配置类会通过@Import注解给SpringBoot默认的数据源HikariDataSource做配置
protected static class PooledDataSourceConfiguration {
}
/**
* {@link AnyNestedCondition} that checks that either {@code spring.datasource.type}
* is set or {@link PooledDataSourceAvailableCondition} applies.
*/
static class PooledDataSourceCondition extends AnyNestedCondition {
PooledDataSourceCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
(prefix = "spring.datasource", name = "type")
static class ExplicitType {
}
(PooledDataSourceAvailableCondition.class)
static class PooledDataSourceAvailable {
}
}
/**
* {@link Condition} to test if a supported connection pool is available.
*/
static class PooledDataSourceAvailableCondition extends SpringBootCondition {
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
ConditionMessage.Builder message = ConditionMessage.forCondition("PooledDataSource");
if (getDataSourceClassLoader(context) != null) {
return ConditionOutcome.match(message.foundExactly("supported DataSource"));
}
return ConditionOutcome.noMatch(message.didNotFind("supported DataSource").atAll());
}
/**
* Returns the class loader for the {@link DataSource} class. Used to ensure that
* the driver class can actually be loaded by the data source.
* @param context the condition context
* @return the class loader
*/
private ClassLoader getDataSourceClassLoader(ConditionContext context) {
Class<?> dataSourceClass = DataSourceBuilder.findType(context.getClassLoader());
return (dataSourceClass != null) ? dataSourceClass.getClassLoader() : null;
}
}
/**
* {@link Condition} to detect when an embedded {@link DataSource} type can be used.
* If a pooled {@link DataSource} is available, it will always be preferred to an
* {@code EmbeddedDatabase}.
*/
static class EmbeddedDatabaseCondition extends SpringBootCondition {
private final SpringBootCondition pooledCondition = new PooledDataSourceCondition();
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
ConditionMessage.Builder message = ConditionMessage.forCondition("EmbeddedDataSource");
if (anyMatches(context, metadata, this.pooledCondition)) {
return ConditionOutcome.noMatch(message.foundExactly("supported pooled data source"));
}
EmbeddedDatabaseType type = EmbeddedDatabaseConnection.get(context.getClassLoader()).getType();
if (type == null) {
return ConditionOutcome.noMatch(message.didNotFind("embedded database").atAll());
}
return ConditionOutcome.match(message.found("embedded database").items(type));
}
}
}
1️⃣
/**
* Actual DataSource configurations imported by {@link DataSourceAutoConfiguration}.
*
* @author Dave Syer
* @author Phillip Webb
* @author Stephane Nicoll
*/
abstract class DataSourceConfiguration {
("unchecked")
protected static <T> T createDataSource(DataSourceProperties properties, Class<? extends DataSource> type) {
return (T) properties.initializeDataSourceBuilder().type(type).build();//在实例化数据源的时候内部类会通过方法名来调用DataSourceConfiguration的createDataSource方法,实际调用的是数据源配置绑定类中的实例方法dataSourceProperties.initializeDataSourceBuilder().type(type).build()来创建的,并且在type方法中传参要实例化的数据源对象类型,我们可以学习这个方法直接抄这个数据源初始化代码来自己创建一个数据源组件,并使用该数据源组件在系统启动时做一些额外的操作,比如将我们创建的数据源用一个数据源代理对象来进行包装进而控制分布式全局事务
}
/**
* Tomcat Pool DataSource configuration.
*/
(proxyBeanMethods = false)
(org.apache.tomcat.jdbc.pool.DataSource.class)
(DataSource.class)
(name = "spring.datasource.type", havingValue = "org.apache.tomcat.jdbc.pool.DataSource",
matchIfMissing = true)
static class Tomcat {
(prefix = "spring.datasource.tomcat")
org.apache.tomcat.jdbc.pool.DataSource dataSource(DataSourceProperties properties) {
org.apache.tomcat.jdbc.pool.DataSource dataSource = createDataSource(properties,
org.apache.tomcat.jdbc.pool.DataSource.class);
DatabaseDriver databaseDriver = DatabaseDriver.fromJdbcUrl(properties.determineUrl());
String validationQuery = databaseDriver.getValidationQuery();
if (validationQuery != null) {
dataSource.setTestOnBorrow(true);
dataSource.setValidationQuery(validationQuery);
}
return dataSource;
}
}
/**
* Hikari DataSource configuration.
*/
(proxyBeanMethods = false)
(HikariDataSource.class)//@ConditionalOnClass在项目中有HikariDataSource这个类才会执行该配置类
(DataSource.class)//@ConditionalOnMissingBean注解的意思是当容器中没有数据源的时候才会执行该配置类给容器注入Hikari数据源
(name = "spring.datasource.type", havingValue = "com.zaxxer.hikari.HikariDataSource",
matchIfMissing = true)
static class Hikari {
//给容器中添加数据源组件HikariDataSource,组件的所有配置都以spring.datasource.hikari作为前缀,配置项和配置类DataSourceProperties绑定
(prefix = "spring.datasource.hikari")
HikariDataSource dataSource(DataSourceProperties properties) {
HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);//默认数据源是通过DataSourceConfiguration的createDataSource(properties, HikariDataSource.class);方法创建的,注意啊,静态内部类可以直接通过方法名调用外部类的静态方法,这个写法以前总结过
if (StringUtils.hasText(properties.getName())) {
dataSource.setPoolName(properties.getName());
}//创建数据源以后判断属性绑定类中是否有name属性,如果有name属性就使用该name属性给数据源池设置名字,我们也模仿这种方式来自定义一个数据源组件
return dataSource;
}
}
/**
* DBCP DataSource configuration.
*/
(proxyBeanMethods = false)
(org.apache.commons.dbcp2.BasicDataSource.class)
(DataSource.class)
(name = "spring.datasource.type", havingValue = "org.apache.commons.dbcp2.BasicDataSource",
matchIfMissing = true)
static class Dbcp2 {
(prefix = "spring.datasource.dbcp2")
org.apache.commons.dbcp2.BasicDataSource dataSource(DataSourceProperties properties) {
return createDataSource(properties, org.apache.commons.dbcp2.BasicDataSource.class);
}
}
/**
* Generic DataSource configuration.
*/
(proxyBeanMethods = false)
(DataSource.class)
(name = "spring.datasource.type")
static class Generic {
DataSource dataSource(DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().build();
}
}
}
通过模仿SpringBoot
初始化数据源的方式来初始化数据源并使用seata
的数据源代理对象来包装数据源组件
注意导入了数据源代理对象,该代理对象中保存了数据源HikariDataSource
的信息,因为系统中有了DataSource
,因此默认的HikariDataSource
组件不会再自动注入了
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 自定义seata配置
* @创建日期 2024/11/24
* @since 1.0.0
*/
public class CustomSeataConfig {
public DataSource dataSource(DataSourceProperties properties){
HikariDataSource dataSource = properties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
if(StringUtils.hasText(properties.getName())){
dataSource.setPoolName(properties.getName());
}
return new DataSourceProxy(dataSource);
}
}
7️⃣:将seata
服务器下的conf
目录下的registry.conf
和file.conf
给每个服务的类路径下都拷贝一份
将file.conf
文件中的service.vgroup_mapping
配置更改成和当前服务的spring.application.name
保持一致即service.vgroup_mapping.mall-order-fescar-service-group="default"
[在 org.springframework.cloud:spring-cloud-starter-alibaba-seata
的org.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration
类中,默认会使用 ${spring.application.name}-fescar-service-group
作为服务名注册到Seata-Server
上,如果和file.conf
中的配置不一致,会提示 no available server to connect
错误,我们也可以通过配置 spring.cloud.alibaba.seata.tx-service-group
修改这个默认后缀,但是该配置必须和file.conf
中的配置service.vgroup_mapping.xxx
这个xxx
保持一致]
Seata
的局限性
Seata
的AT
模式不适用于高并发场景,适合使用在保存商品信息这种并发量不高的场景,保存商品信息需要远程调用库存服务、优惠券服务;此时就适合使用Seata
来做分布式事务控制;就是Seata
的AT
模式适合用在后台管理系统这种并发量不太高的场景做分布式事务控制
像下单这种典型的高并发场景就不适合使用Seata
的AT
模式,Seata
的AT
模式在事务进行期间要获取全局锁、会将全局事务的业务变成串行执行,所有人都需要等待上一个订单创建完才能执行创建下一个订单,这样系统就没法使用了,因此高并发场景下一般不考虑使用XA二阶段提交模式,也不会考虑TCC手动事务补偿模式;
高并发场景下更多的考虑基于可靠消息投递加最终一致性的异步确保型的最大努力通知型方案,因此我们的订单服务不使用Seata
分布式事务解决方案,而选择使用柔性事务中的可靠消息投递+最终一致性的异步确保型方案
即使用Seata
来控制分布式事务提交回滚效率极低,为了保证高并发,我们下订单通过软一致性让订单创建服务出现问题由本地事务控制回滚,在出现异常回滚的同时我们给消息中间件发送消息通知库存服务对锁定的库存通过反向补偿的方式进行回滚,订单服务只需要给消息队列发送消息,无需等待多个远程调用回滚完毕,即订单创建服务的性能损失几乎没有
我们给库存服务专门设置一个解锁库存的业务,库存解锁发起方给消息中间件对应库存服务的专门存储解锁库存消息的队列发送解锁库存消息,库存服务监听到解锁库存消息就在后台自己去慢慢地解锁库存,无需保证强一致,只需要保证一段时间后最终一致即可
❓:seata目前在AT模式下不支持批量插入记录,也不支持MP的addBatch方法,AT模式下只能一条一条数据循环遍历来插入,很消耗数据库性能https://blog.csdn.net/qq_33240556/article/details/140790581
这种方案不仅能保证分布式系统下的最终一致性,还能保证并发性能,还能适应异构服务系统下的业务协作
订单业务逻辑
带回滚的锁库存逻辑
数据库mall-sms
中的表sms_stock_order_task
记录着当前那个订单正在锁库存,在表sms_stock_order_task_detail
表中记录着商品id、锁定库存的数量、锁定库存所在仓库id、订单任务号;即锁库存的时候先给数据库保存要锁定的库存记录,然后再锁定库存;只要锁定库存成功,库存相关的三张表因为本地事务都会成功保存锁库存记录;如果锁失败了数据库因为事务不会有锁库存记录,库存也不会锁定成功
这样库存锁定表中存在的就是下单成功锁定库存的记录和下单失败但是锁定库存成功的记录,我们可以考虑使用一个定时任务,每隔一段时间就扫描一次数据库,检查一下哪些订单没有创建但是有锁定库存的记录,把这些锁定库存记录拿出来重新把库存补偿回滚一下,但是使用定时任务来定期扫描整个数据库表是很麻烦的一件事,我们通过引入延时消息队列来实现定时功能
延时队列的原理是当库存锁定成功以后我们将库存锁定成功的消息发送给延时队列,但是在一定时间内消息被暂存在延时队列中不要往外发送,即锁定库存成功的消息暂存在延时队列中一段时间,在订单支付时间过期以后我们将该消息发送给解锁库存服务,解锁库存服务去检查订单是否被取消,如果订单根本没有创建或者因为订单未支付而被自动取消了就去数据库根据对应的锁库存记录将库存解锁,即锁定的库存在订单最大失效时间以后才固定进行解锁,通过延时消息队列来控制这个定时功能
延时队列锁库存的业务流程
1️⃣:创建订单锁定库存成功后,给主题交换器stock-event-exchange
发送库存工作单消息,消息的路由键stock.locked
,被主题交换器路由到队列stock.delay.queue
中等待50分钟过期时间
2️⃣:库存工作单在消息存活时间到期以后,队列将消息的路由键更改为stock.release
,通过主题交换器stock-event-exchange
将消息路由到队列stock.release.stock.queue
,由该队列将消息转发给消费者库存服务
3️⃣:消费者库存服务收到库存工作单消息,检查订单服务对应订单的状态,如果订单未被成功创建或者订单未支付就解锁被锁定的库存
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务RabbitMQ配置
* @创建日期 2024/12/02
* @since 1.0.0
*/
public class StockRabbitMQConfig {
/**
* @return {@link MessageConverter }
* @描述 给容器中注入一个使用Jackson将消息对象序列化为json对象的消息转换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/01
* @since 1.0.0
*/
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* @return {@link Queue }
* @描述 库存延迟队列延时队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue stockDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","stock-event-exchange");
arguments.put("x-dead-letter-routing-key","stock.release");
arguments.put("x-message-ttl",120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
/**
* @return {@link Queue }
* @描述 库存延迟队列路由队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false);
}
/**
* @return {@link Exchange }
* @描述 库存服务通用主题交换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,false);
}
/**
* @return {@link Binding }
* @描述 延迟队列的延时队列stock.delay.queue和库存服务通用交换器stock-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderCreateOrderBinding(){
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
/**
* @return {@link Binding }
* @描述 延迟队列的路由队列stock.release.stock.queue和库存服务通用交换器stock-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderReleaseOrderBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
}
解锁库存逻辑
需要解锁库存的场景:
创建订单成功,订单过期没有支付被系统自动取消或者订单被用户手动取消时需要解锁库存
创建订单过程中,远程调用库存服务锁定库存成功,但是调用其他服务时出现异常导致创建订单整个业务回滚,之前成功锁定的库存就需要自动解锁来实现回滚,使用Seata
分布式事务性能太差,不适合下单这种高并发场景;基于柔性事务的可靠消息加最终一致性的分布式事务方案,在保证分布式事务下的性能同时,允许一定时间内的软一致性并确保库存数据的最终一致性
只要库存锁定成功就给RabbitMQ中对应的库存延迟队列发送库存工作单消息,使用RabbitTemplate.convertAndSend()
发送锁定库存消息,同时锁定库存以前我们要保存库存工作单信息[对应表wms_ware_order_task
,保存订单Id、订单号],锁定存库成功以后要保存库存工作单详情[对应表wms_ware_order_task_detail
,给该表添加bigint
类型字段ware_id
锁定库存所在仓库id;int
类型的lock_status
,其中1表示已锁定、2表示已解锁、3表示已扣减;注意MP更改了字段需要更改相应的Mapper文件中的resultMap
标签;保存商品sku
、商品数量、库存工作单id、锁定库存所在仓库id、默认锁定状态是已锁定1],锁定库存前先保存库存工作单,保存库存工作单是为了追溯锁定库存信息
给消息队列发送的消息实体类直接写在common包下,该消息对象StockLockedTo
保存库存工作单id、该工作单下所有工作单详情id列表,老师这里发送消息的时机错了,所有商品都锁定成了才给消息队列发送消息,否则本地事务会自动回滚,老师是锁定一个商品就发送一条消息,如果事务回滚了发出去的消息就撤不回来了,而且老师这里发送的消息是全量的库存工作单详情数据
[消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存被成功锁定消息
* @创建日期 2024/12/02
* @since 1.0.0
*/
public class StockLockedMessage {
/**
* 库存工作单
*/
private WareOrderTaskEntity wareOrderTask;
/**
* 库存工作单详情列表
*/
private List<WareOrderTaskDetailEntity> wareOrderTaskDetail;
}
[锁定库存保存库存工作单并发送消息]
xxxxxxxxxx
/**
* @param lockStock
* @return {@link List }<{@link LockStockResultTo }>
* @描述 根据订单和订单项数据锁定库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/01
* @since 1.0.0
*/
public List<LockStockResultTo> lockStock(LockStockTo lockStock) throws RRException{
//2. 锁定库存前创建库存工作单
WareOrderTaskEntity wareOrderTask = new WareOrderTaskEntity();
BeanUtils.copyProperties(lockStock,wareOrderTask);
wareOrderTaskService.save(wareOrderTask);
//1. 锁定库存
List<LockStockTo.OrderItem> orderItems = lockStock.getOrderItems();
List<Long> skuIds = orderItems.stream().map(LockStockTo.OrderItem::getSkuId).collect(Collectors.toList());
Map<Long, LockStockTo.OrderItem> orderItemOfSku = orderItems.stream().collect(Collectors.toMap(LockStockTo.OrderItem::getSkuId, orderItem -> orderItem));
//查询商品所在的全部仓库
List<WareIdsOfSkuIdTo> wareIdsOfSkuIdTos=baseMapper.getWareBySkuIds(skuIds);
if(wareIdsOfSkuIdTos.size()!=skuIds.size()){
throw new RRException(StatusCode.NO_STOCK_EXCEPTION.getMsg(),
StatusCode.NO_STOCK_EXCEPTION.getCode());
}
ArrayList<LockStockResultTo> lockStockResults = new ArrayList<>();
ArrayList<WareOrderTaskDetailEntity> wareOrderTaskDetails = new ArrayList<>();
for (WareIdsOfSkuIdTo wareIdsOfSkuIdTo : wareIdsOfSkuIdTos) {
Long skuId = wareIdsOfSkuIdTo.getSkuId();
String[] wareIds = wareIdsOfSkuIdTo.getWareIds().split(",");
Boolean skuLocked = false;
for (String wareId : Arrays.asList(wareIds)) {
if(baseMapper.tryLockStock(wareId,skuId,orderItemOfSku.get(skuId).getSkuQuantity())==1){
//准备锁定库存的响应数据
LockStockResultTo lockStockResult = new LockStockResultTo();
skuLocked = true;
lockStockResult.setLocked(true);
lockStockResult.setLockQuantity(orderItemOfSku.get(skuId).getSkuQuantity());
lockStockResult.setWareId(Long.parseLong(wareId));
lockStockResult.setSkuId(skuId);
lockStockResults.add(lockStockResult);
//准备库存工作单详情
WareOrderTaskDetailEntity wareOrderTaskDetail = new WareOrderTaskDetailEntity();
wareOrderTaskDetail.setTaskId(wareOrderTask.getId());
wareOrderTaskDetail.setSkuId(skuId);
wareOrderTaskDetail.setSkuName(orderItemOfSku.get(skuId).getSkuName());
wareOrderTaskDetail.setSkuNum(orderItemOfSku.get(skuId).getSkuQuantity());
wareOrderTaskDetail.setWareId(Long.parseLong(wareId));
wareOrderTaskDetails.add(wareOrderTaskDetail);
break;
}
}
if(!skuLocked){
throw new RRException("商品"+skuId+StatusCode.NO_STOCK_EXCEPTION.getMsg(),
StatusCode.NO_STOCK_EXCEPTION.getCode());
}
}
//for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
// wareOrderTaskDetailService.save(wareOrderTaskDetail);
//}
wareOrderTaskDetailService.saveBatch(wareOrderTaskDetails);
StockLockedMessage stockLockedMessage = new StockLockedMessage();
stockLockedMessage.setWareOrderTask(wareOrderTask);
stockLockedMessage.setWareOrderTaskDetail(wareOrderTaskDetails);
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",stockLockedMessage);
return lockStockResults;
}
使用@RabbitHandler
监听锁定库存消息队列,获取到消息对象,按以下情况执行解锁库存逻辑
1️⃣:创建订单过程中,库存锁定成功,但是接下来创建订单出现问题,整个订单回滚,被锁定的库存需要自动解锁
只要库存工作单详情存在,就说明库存锁定成功,此时我们就要看订单状态来判断是否需要解锁库存,如果订单都没有说明订单没有被成功创建,此时就要使用库存工作单详情来解锁库存
如果有订单查看订单状态,如果订单已经被取消就解锁库存,只要订单没有被取消就不能解锁库存,订单被取消字段status
等于4
根据库存工作单的id去订单服务查询订单实体类,如果订单不存在或者订单的status
字段为4就调用unLockStock
方法解锁库存
2️⃣:订单创建失败是由于库存锁定失败导致的
库存工作单数据没有是库存本地事务整体回滚导致的,库存工作单记录不会创建,锁定库存操作也会全部自动回滚,这种情况无需解锁
解锁库存需要知道商品的skuId
、锁定库存所在仓库id、锁定库存数量、库存工作单详情id
,解锁就是将原来的增加的锁定库存的字段再减掉UPDATE wms_ware_sku SET stock_locked = stock_locked + #{num} WHERE sku_id=#{skuId} AND ware_id = #{wareId}
🚁:自动应答的消息队列,一旦在消息的消费过程中出现异常导致消息无法被正常消费,消息就丢失了,比如Feign远程调用网络闪断,或者远程服务的Feign调用必须携带用户的登录状态但是实际请求没有携带用户登录状态被远程服务拦截,抛出异常终止后续方法执行,此时消息就彻底丢失了
📓:使用配置spring.rabbitmq.listener.simple.acknowledge-mode=manual
开启消息接收手动应答,在订单解锁成功以后使用方法channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)
来做消息接收手动应答,解锁成功立马手动应答,无需解锁什么也不用做立马手动应答,只要在应答后发生异常也不会导致消息丢失;如果远程调用没有成功返回在库存解锁以前出现问题,我们使用方法channel.basicReject(message.getMessageProperties().getDeliveryTag(),true)
来手动拒绝消息并将消息重新放到队列中,给别人继续消费消息解锁库存的机会,比如由于分区故障为了保证一致性部分服务不可用
❓:订单服务所有远程调用请求都要求有登录状态,但是我们的消息队列监听方法的远程调用不可能带用户登录状态,因此我们需要在订单服务的拦截器中放行所有消息队列监听方法调用的订单服务远程接口,特别注意,这个需要被放行的请求路径还拼接了请求参数导致请求路径是动态的
🔑:我们通过在拦截器中放行指定URI的请求来实现这个目的,对于URI是变化的我们使用Spring
提供的boolean match = AntPathMatcher.match("/order/order/status/**",request.getRequestURI())
[HttpServletRequest.getRequestURI()
是获取请求路径的URI,HttpServletRequest.getRequestURL()
是获取请求路径的URL],如果请求URI匹配我们需要的格式就直接通过拦截器的return true
放行,无需再进行用户登录状态检查
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 用户登录状态拦截器
* @创建日期 2024/11/09
* @since 1.0.0
*/
public class LoginStatusInterceptor implements HandlerInterceptor {
public static ThreadLocal<UserBaseInfoVo> loginUser=new ThreadLocal<>();
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
AntPathMatcher antPathMatcher = new AntPathMatcher();
if(antPathMatcher.match("/order/order/get/**", request.getRequestURI())){
return true;
}
UserBaseInfoVo attribute = (UserBaseInfoVo)request.getSession().getAttribute(MallConstant.SESSION_USER_LOGIN_STATUS_KEY);
if(attribute!=null){
loginUser.set(attribute);
return true;
}else{
request.getSession().setAttribute("tip","请先登录");
response.sendRedirect("http://auth.earlmall.com/login.html");
return false;
}
}
}
专门抽取一个消息队列的监听器Service来处理消息队列中的消息
在类上标注@RabbitListener(queues="stock.release.stock.queue")
来监听指定队列,在类上标注@Service
注解将该类的实例化对象作为容器组件,在具体的方法上标注注解@RabbitHandler
,在该方法中调用库存服务实现的解锁库存逻辑,解锁库存的方法出现任何异常都手动拒绝消息并重新入队列,只要解锁库存方法成功调用就手动应答接收消息,远程调用如果状态码不是0说明没有查到对应订单的实体类,此时直接抛异常执行拒绝接收消息的逻辑
[监听队列消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
try{
wareSkuService.tryReleaseStock(stockLockedMessage.getWareOrderTask().getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(stockLockedMessage.getClass().getTypeName());
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(stockLockedMessage));
save(messageLog);
}
}
}
}
[解锁库存]
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据库存锁定消息尝试解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(String orderSn) {
//1. 根据订单号远程查询订单服务对应的订单状态,如果订单不存在或者订单状态不是已关闭就不解锁库存
R res = orderFeignClient.getOrderByOrderSn(orderSn);
OrderTo order = res.get("order", new TypeReference<OrderTo>() {});
if(order==null && order.getStatus()==4){
return;
}
//2. 根据订单号查询库存工作单id
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().
eq("order_sn", orderSn));
if(wareOrderTaskEntity==null){
return;
}
List<WareOrderTaskDetailEntity> wareOrderTaskDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", wareOrderTaskEntity.getId()));
for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
//检索库存工作单状态,只有库存工作单状态为未解锁时才解锁库存,如果不为未解锁本条库存不解锁
if(wareOrderTaskDetail.getLockStatus()!=2){
baseMapper.releaseStock(wareOrderTaskDetail.getSkuNum(),wareOrderTaskDetail.getSkuId(),wareOrderTaskDetail.getWareId());
//解锁后将库存工作单状态更改为已解锁
wareOrderTaskDetail.setLockStatus(2);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
}
}
解锁库存成功后通过库存工作单详情id将库存工作单详情的状态lock_status
更改为已解锁2,增加前面解锁库存的条件只有库存工作单详情为已锁定状态且需要解锁时才能解锁库存
带回滚的锁库存实现
给库存服务mall-ware
引入、配置RabbitMQ
并在主启动类上标注@EnableRabbit
开启RabbitMQ
功能
配置RabbitMQ
的消息的JSON
序列化机制
给库存服务添加一个默认交换器stock-event-exchange
交换器使用Topic
交换器类型,因为该交换器需要绑定多个队列,而且还需要使用对不同消息的路由键进行模糊匹配的功能
给库存服务添加一个释放库存队列stock.release.stock.queue
,支持持久化,不支持排他和自动删除,普通队列不需要设置参数
给库存服务添加一个延迟库存工作单消息的队列stock.delay.queue
,给该延时队列设置死信交换器stock-event-exchange
,设置死信的路由键为stock.release
,设置队列的消息存活时间为120秒[方便测试用的,比验证订单创建的延迟队列多一分钟],支持持久化,不支持排他和自动删除
给库存服务的库存释放队列和交换器添加一个绑定关系,绑定目的地stock.release.stock.queue
,交换器stock-event-exchange
,绑定键stock.release.#
给库存服务的库存延时队列和交换器添加一个绑定关系,绑定目的地stock.delay.queue
,交换器stock-event-exchange
,绑定键stock.delay.#
监听一个队列让以上所有容器组件都通过SpringBoot
自动去RabbitMQ
中检查创建
取消订单逻辑
订单服务队列和交换器组件和绑定关系
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 自定义RabbitMQ配置
* 1. 使用@Bean注解注入容器的队列、交换器、绑定关系如果在RabbitMQ服务器中没有SpringBoot会自动在RabbitMQ服务器中进行创建
* @创建日期 2024/11/26
* @since 1.0.0
*/
public class CustomRabbitMQConfig {
/**
* @return {@link Queue }
* @描述 订单延迟队列延时队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue orderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}
/**
* @return {@link Queue }
* @描述 订单延迟队列路由队列
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
/**
* @return {@link Exchange }
* @描述 订单服务通用主题交换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
/**
* @return {@link Binding }
* @描述 延迟队列的延时队列order.delay.queue和订单服务通用交换器order-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderCreateOrderBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* @return {@link Binding }
* @描述 延迟队列的路由队列order.release.order.queue和订单服务通用交换器order-event-exchange的绑定关系
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/26
* @since 1.0.0
*/
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
public Binding orderReleaseStockBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}
订单创建成功就给交换器order-event-exchange
发送消息,消息路由键order.create.order
,保存的消息是OrderCreateTO.getOrder()
,消息会被交换器路由到order.delay.queue
[队列中的消息延时时间为30min],消息变成死信后路由键配置成order.release.order
并将消息转发到order-event-exchange
路由到order.release.order.queue
被订单服务监听,订单服务监听接收取消订单并将消息转发以路由键order.release.other
通过交换器order-event-exchange
,队列将消息转发给订单服务
[订单服务创建订单发起消息]
xxxxxxxxxx
/**
* @param orderParam
* @return {@link PayVo }
* @描述 创建订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/30
* @since 1.0.0
*/
//@GlobalTransactional
public PayVo createOrder(OrderSubmitParamVo orderParam) throws RRException{
UserBaseInfoVo userBaseInfo = LoginStatusInterceptor.loginUser.get();
String orderSn = IdWorker.getTimeId();
Long userId = userBaseInfo.getId();
//1. 校验订单提交令牌,校验失败抛出表单重复提交异常
if(!verifyOrderSubmitToken(userId,orderParam.getToken())){
throw new RRException(StatusCode.FORM_REPEAT_EXCEPTION.getMsg(),
StatusCode.FORM_REPEAT_EXCEPTION.getCode());
}
//2. 创建订单
OrderEntity order = createOrder(userId, orderParam.getAddrId(),orderSn);
//3. 创建订单项列表
//远程调用购物车服务通过用户id获取购物车被选中的购物项
List<OrderStatementVo.SelectedCartItemVo> selectedCartItems = cartFeignClient.getSelectedCartItem(userId);
if(selectedCartItems==null || selectedCartItems.size()<=0){
throw new RRException(StatusCode.NO_CART_ITEM_EXCEPTION.getMsg(),
StatusCode.NO_CART_ITEM_EXCEPTION.getCode());
}
List<OrderItemEntity> orderItems = createOrderItems(orderSn,selectedCartItems);
//4. 计算订单价格
calculateOrderAmount(order,orderItems);
//7. 验价
if (Math.abs(order.getPayAmount().subtract(orderParam.getTotalPrice()).doubleValue())>=0.01) {
throw new RRException(StatusCode.PRICE_VERIFY_EXCEPTION.getMsg()+
"核算金额: ¥"+orderParam.getTotalPrice()+",实际金额: ¥"+order.getPayAmount(),
StatusCode.PRICE_VERIFY_EXCEPTION.getCode());
}
//5. 保存订单记录和订单项记录
orderService.save(order);
//seata目前在AT模式下不支持批量插入记录,https://blog.csdn.net/qq_33240556/article/details/140790581,反正我们后面要换成软性事务,后面再换成批量插入
//for (OrderItemEntity orderItem : orderItems) {
// orderItemService.save(orderItem);
//}
orderItemService.saveBatch(orderItems);
//6. 锁定库存
LockStockTo lockStock = new LockStockTo();
lockStock.setOrderId(order.getId());
lockStock.setOrderSn(orderSn);
lockStock.setConsignee(order.getReceiverName());
lockStock.setConsigneeTel(order.getReceiverPhone());
lockStock.setDeliveryAddress(order.getReceiverDetailAddress());
lockStock.setPaymentWay(1);
lockStock.setOrderItems(orderItems);
R res = stockFeignClient.lockStock(lockStock);
if (res.getCode()!=0) {
throw new RRException((String) res.get("msg"),res.getCode());
}
// 8.给消息队列发送消息
OrderCreatedMessage orderCreatedMessage = new OrderCreatedMessage();
orderCreatedMessage.setOrderSn(orderSn);
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderCreatedMessage);
//封装视图数据,注意存入分布式session的数据对应实体类最好全部放在common包下,session中有的数据不论其他服务能否用到,
// 其他服务必须包含session中所有数据的对应类型,否则其他服务即使没有使用对应的session也会直接报错
PayVo pay = new PayVo();
OrderVo orderVo = new OrderVo();
BeanUtils.copyProperties(order,orderVo);
List<OrderItemVo> orderItemVos = orderItems.stream().map(orderItem -> {
OrderItemVo orderItemVo = new OrderItemVo();
BeanUtils.copyProperties(orderItem, orderItemVo);
return orderItemVo;
}).collect(Collectors.toList());
pay.setOrder(orderVo);
pay.setOrderItems(orderItemVos);
pay.setFare(order.getFreightAmount());
pay.setPayablePrice(order.getPayAmount());
return pay;
}
[订单服务接收消息取消订单并向库存服务发送消息]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "order.release.order.queue") (
public class OrderReleaseQueueListenerImpl extends ServiceImpl<MessageDao,MessageEntity> implements OrderReleaseQueueListener {
RabbitTemplate rabbitTemplate;
OrderService orderService;
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void trySendMessageReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage, Channel channel) {
orderService.cancelOrder(orderCreatedMessage.getOrderSn());
try{
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderCreatedMessage);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(orderCreatedMessage.getClass().getTypeName());
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(orderCreatedMessage));
save(messageLog);
}
}
}
}
订单服务收到消息根据订单id查询数据库对应的订单状态,如果订单状态为订单创建对应的状态码,将订单状态更改为取消订单对应状态码
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据订单号关闭订单
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void cancelOrder(String orderSn) {
OrderEntity order = getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
order.setStatus(OrderConstant.OrderStatus.CANCELLED.getCode());
updateById(order);
}
通过监听消息队列消息在取消订单或者订单创建失败的情况下解锁库存
[消息队列监听]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
[解锁库存]
xxxxxxxxxx
/**
* @param orderSn
* @描述 根据库存锁定消息尝试解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(String orderSn) {
//1. 根据订单号远程查询订单服务对应的订单状态,如果订单不存在或者订单状态不是已关闭就不解锁库存
R res = orderFeignClient.getOrderByOrderSn(orderSn);
OrderTo order = res.get("order", new TypeReference<OrderTo>() {});
if(order==null && order.getStatus()==4){
return;
}
//2. 根据订单号查询库存工作单id
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().
eq("order_sn", orderSn));
if(wareOrderTaskEntity==null){
return;
}
List<WareOrderTaskDetailEntity> wareOrderTaskDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", wareOrderTaskEntity.getId()));
for (WareOrderTaskDetailEntity wareOrderTaskDetail : wareOrderTaskDetails) {
//检索库存工作单状态,只有库存工作单状态为未解锁时才解锁库存,如果不为未解锁本条库存不解锁
if(wareOrderTaskDetail.getLockStatus()!=2){
baseMapper.releaseStock(wareOrderTaskDetail.getSkuNum(),wareOrderTaskDetail.getSkuId(),wareOrderTaskDetail.getWareId());
//解锁后将库存工作单状态更改为已解锁
wareOrderTaskDetail.setLockStatus(2);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
}
}
❓:我们这里是用库存解锁时间大于取消订单时间来实现解锁库存只要订单的状态为已取消或者订单没有成功创建,就释放已经锁定的库存,但是这种方式存在很严重的问题;比如订单创建成功,但是由于各种原因,消息延迟了很久才发给消息队列,但是库存一锁定成功就将消息发送给消息队列了,导致解锁库存的消息比取消订单的消息先到期,这时候就会导致解锁库存的消息被消费,库存因为订单处于新建状态无法解锁,即使后续订单被解锁了库存也无法被解锁了;即一旦发生意外导致解锁库存的消息比取消订单的消息先到,就会发生被锁定的库存永远无法解锁的情况
🔑:让订单服务取消订单后再发一个消息路由键为order.release.other
给交换器order-event-exchange
,我们为交换器order-event-exchange
和队列stock.release.stock.queue
设定绑定关系,绑定关系设定为order.release.other.#
,让取消订单的消息被队列stock.release.stock.queue
发送给消费者库存服务。库存服务用@RabbitListener(queues="stock.release.stock.queue")
监听同一个队列stock.release.stock.queue
,用@RabbitHandler
标注的方法监听消息类型为OrderTo
,在原来解锁库存的逻辑中判断,当前库存是否解锁过,没解锁过就解锁,解锁过就不用解锁了,老师的逻辑是根据订单号查询库存工作单,根据库存工作单找到所有没有解锁的库存工作单详情调用此前解锁库存的方法进行解锁,感觉这里老师的实现不好,自己实现这部分代码
实际上解锁库存是订单取消的时候解锁一次,锁定库存成功以后一定时间再解锁一次
影响消息可靠性的因素
消息丢失:消息丢失在电商系统中是一个非常可怕的操作,比如订单消息丢失可能会影响到后续一连串比如商家确认、解锁库存、物流等等各种信息,消息可能发生丢失的原因如下:
消息从生产者发送出去,但是由于网络问题抵达RabbitMQ服务器失败,或者因为异常根本没有发送成功
这时候可以用try...catch
语句块来发送消息,发送失败在catch语句块中设置重试策略
同时给数据库创建一张消息数据库表mq_message
,建表语句如下
xxxxxxxxxx
CREATE TABLE `mq_message` (
`message_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '消息id',
`content` text COMMENT '消息内容',#序列化为json
`to_exchange` varchar(255) DEFAULT NULL COMMENT '投递交换器',
`routing_key` varchar(255) DEFAULT NULL COMMENT '路由键',
`class_type` varchar(255) DEFAULT NULL COMMENT '消息类型的全限定类名',
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL COMMENT '消息日志创建时间',
`update_time` datetime DEFAULT NULL COMMENT '消息日志更新时间',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
只要消息发送失败就给数据库存上这么一条日志,定期扫描数据库来检查消息日志状态来重新发送消息
消息到达Broker,消息只有被投递给队列才算持久化完成,一旦消息还没有到达队列,RabbitMQ服务器宕机消息就会因为还没有来得及持久化而发生丢失
开启生产者消息抵达队列确认,只要消息没有成功抵达队列就会触发生产者的returnCallback
回调,消息不能成功抵达应该设置消息重试发送和向数据库记录消息日志
开启生产者消息确认回调,只要消息成功抵达RabbitMQ服务器就触发该回调
自动ACK的状态,消费者收到消息,但是消息没有被成功消费,比如消费消息或者消费消息前出现异常或者服务器宕机,自动应答的消息会直接丢失
开启手动ACK,消息成功消费以后再手动应答接收消息,消息消费失败就手动拒绝消息让消息重新入队列,注意消息没有被应答即没有手动拒绝RabbitMQ没有收到应答的消息也会默认重新入队列再次发送
🔎:防消息丢失的核心就是做好消息生产者和消息消费者两端的消息确认机制,主要策略就是生产者的消息抵达确认回调和消费者的手动应答,凡是消息不能成功抵达服务端和消费端的消息都做好消息日志记录,定期扫描数据库,将发送失败的消息定期重新发送
消息重复:就是因为各种原因导致的消息重新投递
消息消费成功,事务已经提交,但是手动Ack的时候机器宕机或者网络连接中断导致手动Ack没有进行,RabbitMQ的消息因为没有收到应答自动将消息重新入队列并将消息状态从Unack
状态变成ready
状态,并再次将消息发送给消费者
消息消费过程中消费失败又再次重试发送消息,注意啊,虽然我们让消息消费失败消息拒绝重新入队列
解决办法是业务消息消费接口设计成幂等性接口,比如解锁库存要判断库存工作单详情的状态位,消息消费成功修改对应状态位
使用redis或者mysql防重表,将消息和业务通过唯一标识联系起来,业务被成功处理过的消息就不用再处理了
RabbitMQ的每个消息都有一个redelivered
消息属性字段,每个消息都可以通过Boolean redelivered = message.getMessageProperties().getRedelivered()
判断当前消息是否被第二次或者第N次重新投递过来的,这个一般做辅助判断,因为谁也不能保证消息在第几次消费被消费成功
消息积压:消息队列中的消息积压太多,导致消息队列的性能下降
消费者宕机导致消息积压
消费者消费能力不足,比如活动高峰期,比如消费者宕机导致的消费者集群消费能力不足,有服务完全不可用消息反复重入队列消息肯定会积压,应该设置重试次数,投递达到重试次数消息就被专门的服务处理比如存入数据库离线处理
注意消费者没有应答消费消息,队列中的消息处于Unack
状态,生产者会不停报错,让CPU飚高,非常消耗系统性能,这个问题要想办法防一下
发送者发送消息的流量太大,超出消费者的消费能力
限制发送者的流量,让服务限流业务进不来就能限制发送者的流量,不过只是因为消息中间件或者消费者能力有限就限制业务有点得不偿失
上线更多的消费者增强消息的消费能力
上线专门的消息队列消息消费服务,将消息批量从消息队列中取出来,直接写入数据库,缓解消息队列压力,然后再缓慢离线从数据库中获取消息离线处理
消息队列集群
一般都是把消息中间件专门做成一个服务,叫数据中台,负责消息发送和自动记录消息日志,消息发送失败自动进行重试,将消息发送的所有功能都考虑周到,其他服务通过调用该服务来实现消息的发送,看老师的意思,一般消息发送成功也得记录日志,这个可以作为防止消息丢失更进一步的手段,毕竟会影响性能
生产者抵达确认带数据库保存失败消息
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 RabbitMQ客户端自定义配置
* @创建日期 2024/11/01
* @since 1.0.0
*/
public class MallRabbitConfig {
private RabbitTemplate rabbitTemplate;
/**
* @return {@link MessageConverter }
* @描述 给容器中注入一个使用Jackson将消息对象序列化为json对象的消息转换器
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/01
* @since 1.0.0
*/
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
public void initRabbitTemplate(){
/*
1. 设置RabbitMQ服务器收到消息后的确认回调ConfirmCallback
配置配置项spring.rabbitmq.publisher-confirms=true
为rabbitTemplate设置回调实例化对象confirmCallback
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 当前消息的唯一关联标识,里面的id就是消息标识的唯一id
* @param ack 消息是否成功收到
* @param cause 消息发送失败的原因
* @描述 1. 只要消息抵达Broker就会触发该回调,与消费者和消息是否入队列无关
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/02
* @since 1.0.0
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("message confirm: {correlationData:"+correlationData+
"ack:"+ack+
"cause:"+cause+"}");
}
});
/*
2.设置RabbitMQ队列没有收到消息的确认回调ReturnCallback
配置配置项spring.rabbitmq.publisher-returns=true
配置配置项spring.rabbitmq.template.mandatory=true
为rabbitTemplate设置回调实例化对象returnCallback
* */
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 投递失败的消息本身的详细信息
* @param replyCode 导致消息投递失败的错误状态码
* @param replyText 导致消息投递失败的错误原因
* @param exchange 当时该消息发往的具体交换器
* @param routingKey 当时该消息的具体路由键
* @描述
* @author Earl
* @version 1.0.0
* @创建日期 2024/11/03
* @since 1.0.0
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("message lose: {message:"+message+
"replyCode:"+replyCode+
"replyText:"+replyText+
"exchange:"+exchange+
"routingKey"+routingKey);
}
});
}
}
消费者手动ACK
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
使用MP数据库消息日志记录
同时给数据库创建一张消息数据库表mq_message
,建表语句如下
xxxxxxxxxx
CREATE TABLE `mq_message` (
`message_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '消息id',
`content` text COMMENT '消息内容',#序列化为json
`to_exchange` varchar(255) DEFAULT NULL COMMENT '投递交换器',
`routing_key` varchar(255) DEFAULT NULL COMMENT '路由键',
`class_type` varchar(255) DEFAULT NULL COMMENT '消息类型的全限定类名',
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL COMMENT '消息日志创建时间',
`update_time` datetime DEFAULT NULL COMMENT '消息日志更新时间',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
MP插入消息记录
[实体类]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务消息实体
* @创建日期 2024/12/02
* @since 1.0.0
*/
"sms_message") (
public class MessageEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息id
*/
private Long messageId;
/**
* 消息内容
*/
private String content;
/**
* 投递交换器
*/
private String toExchange;
/**
* 路由键
*/
private String routingKey;
/**
* 消息类型的全限定类名
*/
private String classType;
/**
* 0-新建 1-已发送 2-错误抵达 3-已抵达
*/
private Integer messageStatus;
/**
* 消息日志创建时间
*/
fill = FieldFill.INSERT) (
private Date createTime;
/**
* 消息日志更新时间
*/
fill = FieldFill.INSERT_UPDATE) (
private Date updateTime;
}
[持久化接口]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存服务消息
* @创建日期 2024/12/02
* @since 1.0.0
*/
public interface MessageDao extends BaseMapper<MessageEntity> {
}
[持久化接口对应xml]
xxxxxxxxxx
<mapper namespace="com.earl.mall.stock.dao.MessageDao">
<!-- 可根据自己的需求,是否要使用 -->
<resultMap type="com.earl.mall.stock.entity.MessageEntity" id="messageMap">
<result property="messageId" column="message_id"/>
<result property="content" column="content"/>
<result property="toExchange" column="to_exchange"/>
<result property="routingKey" column="routing_key"/>
<result property="classType" column="class_type"/>
<result property="messageStatus" column="message_status"/>
<result property="createTime" column="create_time"/>
<result property="updateTime" column="update_time"/>
</resultMap>
</mapper>
[业务实现类]
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 库存释放队列监听器
* @创建日期 2024/12/02
* @since 1.0.0
*/
queues = "stock.release.stock.queue") (
public class StockReleaseQueueListenerImpl extends ServiceImpl<MessageDao, MessageEntity> implements StockReleaseQueueListener {
private WareSkuService wareSkuService;
/**
* @param message
* @param stockLockedMessage
* @param channel
* @描述 根据到期的锁定库存消息释放库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/02
* @since 1.0.0
*/
public void tryReleaseStock(Message message, StockLockedMessage stockLockedMessage, Channel channel) {
String orderSn = stockLockedMessage.getWareOrderTask().getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(stockLockedMessage),stockLockedMessage.getClass().toString());
}
/**
* @param message
* @param orderCreatedMessage
* @param channel
* @描述 取消订单自动解锁库存
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
public void tryReleaseStock(Message message, OrderCreatedMessage orderCreatedMessage,Channel channel){
String orderSn = orderCreatedMessage.getOrderSn();
tryReleaseStock(orderSn,message,channel,JSON.toJSONString(orderCreatedMessage),orderCreatedMessage.getClass().toString());
}
/**
* @param orderSn
* @param message
* @param channel
* @param messageContent
* @param messageClassType
* @描述 解锁库存方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/03
* @since 1.0.0
*/
private void tryReleaseStock(String orderSn,Message message,Channel channel,String messageContent,String messageClassType){
try{
wareSkuService.tryReleaseStock(orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception exception) {
try {
MessageEntity messageLog = new MessageEntity();
messageLog.setMessageStatus(2);
messageLog.setClassType(messageClassType);
messageLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageLog.setToExchange(message.getMessageProperties().getReceivedExchange());
messageLog.setContent(JSON.toJSONString(messageContent));
save(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception excep) {
excep.printStackTrace();
}
}
}
}
}
session的原理
原理图
用户登录成功可以将用户相关信息保存到以Map作为底层的session
中,并指定浏览器保存一个属性名为JSESSIONID
的cookie
,以后浏览器访问服务器会携带包含该参数的cookie
,直到浏览器关闭才会清除该cookie
,该JSESSIONID
作为服务器识别用户身份的标识,通过该ID来查询用户在服务器中保存的特定状态信息
传统session存在的问题
1️⃣:在集群环境下,一个服务会被复制多份,但是运行过程中集群中不同运行实例上的session
信息无法被同步,即在一台服务器上被存入session
的数据在另一台服务器或者运行实例上无法被取出
2️⃣:在分布式环境下,不同服务间的session
不能共享,比如我在认证服务进行的数据认证并需要在跳转首页时验证用户的登录状态并传递用户信息到首页,但是首页在商品服务,我在认证服务存入session
的数据无法被共享到商品服务的session
中
3️⃣:cookie
的默认的作用域[就是下图的Domain字段]是同一个三级域名下,域名不一样,在发起请求时cookie
无法被携带
这里登录页面的cookie
可以在原理图中看到,请求域名发生变化,cookie
并没有被携带,也无法通过cookie
中的JSESSIONID
来匹配用户会话状态信息
注意以下所有的方案解决的是同二级域名情况下,即同二级域名下浏览器发起请求携带完整的cookie数据,一旦二级域名发生变化,即使同一个客户端向同一台服务器发起请求也不会携带对应的cookie信息,这属于子域session共享问题
方案一:网关负载均衡同一个用户前后请求到不同的服务实例上,我们希望用户请求不管被负载到哪一台服务器上都能访问到同一个用户在系统中完整的session
数据,要实现该目的我们可以使用session
复制[同步]方案
session
复制[同步]方案的原理是两个服务实例之间互相同步session
,取两个服务实例session
数据的并集
这种方式的优点是Tomcat服务器原生支持,只需要修改Tomcat配置文件就能让多个Tomcat服务器之间互相复制Session
缺点也非常明显
session同步是网络IO过程,数据同步存在延迟,占用大量系统网络带宽,降低系统业务处理能力
真正的致命缺陷是session存储在内存中,任意一台服务实例都保存的是session全量数据,每增加一台服务实例,session数据就要增加一份,有N个服务实例相较于默认的单份session设计session同步带来的内存开销会增加N倍,小型系统服务实例数量少,用户少可以这么玩,大型分布式集群系统,这种方式不可取,会带来巨量的额外内存开销,内存贵而且上限低,服务器内存一大就会带动主板甚至整个服务器的更新换代,非常的昂贵
方案二:服务端不存储session,服务端设置保存session数据到客户端的cookie中,用户每次请求都携带完整的cookie信息,服务器从cookie中读取希望在session中共享的数据,相当于用户自己整理一个档案袋,走到哪里都携带这个档案袋。这种方式也全是缺点,实际开发中不会使用这种方式
session数据存放在cookie中,存在巨大的泄露、篡改和窃取等安全隐患,cookie中的数据在浏览器中可以直接看见,而且还可以直接手动修改,这个是不能用的核心原因
cookie中数据的总长度有限制,一般是4K,不能保存大量数据
每次请求都会携带完整cookie信息,对网络带宽资源是严重的浪费
方案三:使用哈希一致性,只要访问者的IP是同一个,我们就永远将来自该IP的请求转发到同一台服务器上,包括四层代理ip_hash
方案、七层代理业务字段hash
方案
这种方式也存在问题,比如某一台服务器宕机,而且现在运营商分配的IP地址也不一定是固定的,特别是移动互联网,用户的位置随时可能会变换
因为IP的不稳定因素,我们可以使用一个标识用户身份的字段比如用户id来做哈希运算,将一个用户的所有请求分配到同一台服务器上
优点
只需要更改nginx
或者网关的配置,不需要更改代码,
因为哈希运算保证了运算的离散性,只要保证做哈希运算字段的数据离散性就能保证正常的负载均衡
支持服务器集群的大量水平扩容,这一点session
复制不行
缺点
session
仍然存在于服务器中,服务器宕机或者重启可能会导致部分session
丢失,影响到业务,比如用户登录状态信息丢失导致用户需要重新登录
服务器水平扩展的时候,哈希取模以后用户请求的服务器会重新分布,但是想要将session
也转移到对应的服务器上难度比较高
方案四:统一在一个公共第三方存储session数据,把session数据存到数据库或者Redis
等一些NoSQL
中间件中
优点:
数据没有安全隐患,只要保证第三方存储设备中的数据安全,就没有人能篡改session中的数据
服务器水平扩容很方便
服务器重启或者扩容都不会有session丢失
缺点:
增加一次网络调用,而且使用该方案需要修改原来使用session
的代码,将从session
中获取数据的代码改为从redis
中获取数据,从redis
中获取数据比直接从session
中获取数据慢很多
Spring
从这个方案入手推出了SpringSession
方案来解决session
共享的问题
子域auth.earlmall.com
对应请求下发的cookie
,如果我们将作用域的范围手动设置成更大的顶级域名earlmall.com
,这样子域auth.earlmall.com
请求中下发的cookie在更大的顶级域名请求中仍然能正常携带完成的cookie信息,且子域名之间也可以共享同一个cookie
,即一个父域名下的所有子域可以共享一个作用域Domain字段为父域名的cookie,这是浏览器的策略决定的,浏览器在发起指定作用域下的请求时会自动将cookie
信息携带在请求头中,携带的形式是cookie: JSESSIONID=XXXXXXX
比如cookie中保存了用户的JSESSIONID
,默认情况下该cookie的作用域Domain字段为对应的二级域名auth.earlmall.com
,此时子域名变化了即使访问同一台服务器也不会携带cookie信息;我们可以在服务器下发cookie的时候手动设置为earlmall.com
,这样就能实现即使访问其他子域名或者访问顶级域名时都会携带子域名auth.earlmall.com
下发的cookie[因为登录服务是子域auth.earlmall.com
实现的,JSESSIONID
也是通过此处使用session下发的]
为了在一个顶级域名下的所有子域名和顶级域名间共享cookie,服务器在下发cookie的时候一定要将cookie的作用域设置为顶级域名,
下发cookie的时候调用的是httpServletResponse.addCookie(new Cookie(String name,String value))
来下发的cookie,这个cookie对象有一个setDomain(String domain)
方法可以设置cookie的作用域,该作用域默认域名是当前请求的二级域名,因此我们下发cookie
的时候需要将作用域更改为顶级域名,但是一些默认cookie
的下发是由Tomcat
本身来控制的,比如第一次使用session
时Tomcat
就会给cookie
中下发一个JSESSIONID
的cookie
,如果我们自己来调整这段逻辑还是非常麻烦的
SpringSession
也考虑了这个问题并封装了对cookie
的Domain
操作,来解决session
在一个顶级域名下的所有子域共享的问题
浏览器访问认证服务进行登录,登录后我们将用户的登录信息存入Redis
,并给浏览器下发JSESSIONID
的cookie
,将该cookie
的Domain
作用域从子域名auth.earlmall.com
改为顶级域名earlmall.com
来放大作用域实现cookie
在同一个顶级域名下所有请求共享,所有的服务都使用JSESSIONID
从Redis
中取出session
中共享的数据,实现后端统一第三方存储session
,前端拿着用户凭证JSESSIONID
去各个服务从第三方中获取session
中的数据
使用SpringSession
来简化将session
数据统一存储到第三方Redis
以及解决session
跨域共享问题的开发
SpringSession项目官网通过spring官网--Projects--SpringSession--Learn可以找到SpringSession的官方文档,其中章节Samples and Guides示例和向导是SpringSession的快速上手指南,章节HttpSession Integration是SpringSession基于各种第三方存储介质的整合
SpringBoot
基于Redis
整合SpringSession
解决session跨域跨服务共享问题
🔎:在Samples and Guides中找到并点击HttpSession with Redis,可以找到对应的使用引导
引入依赖
xxxxxxxxxx
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
检查一下SpringSession
操作redis
需不需要引入org.springframework.boot:spring-boot-starter-data-redis
,经过验证,SpringSession
依赖于org.springframework.data:spring-data-redis
,所以无需再额外引入
xxxxxxxxxx
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
为了性能我们可以把默认的lettuce-core
排除掉使用最新的io.lettuce:lettuce-core:5.2.0.RELEASE
,老版本对内存管理存在问题,并发量一高就会大量抛异常
xxxxxxxxxx
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/io.lettuce/lettuce-core -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
SpringBoot
对SpringSession
的配置
选择session
的存储介质为redis
[必选]
xxxxxxxxxx
spring.session.store-type=redis
配置session
的超时时间[可选,默认配置是30分钟] [默认单位是秒,要指定分钟可以指定成如下格式30m]
xxxxxxxxxx
server.servlet.session.timeout=30m
配置redis
中session
的刷新策略[可选]
xxxxxxxxxx
spring.session.redis.flush-mode=on-save
配置redis
中session
存储的前缀[可选] [SpringSession
创建的缓存也和使用SpringCache
创建的缓存一样会创建相应的目录来管理]
xxxxxxxxxx
spring.session.redis.namespace=spring:session
SpringBoot
配置Redis
的连接信息
这个一般在项目中使用Redis
就会主动配置
xxxxxxxxxx
spring.redis.host = localhost #redis服务器主机IP地址
spring.redis.password = xxxx #redis服务器的登录密码
spring.redis.port = 6379 #redis服务器的端口号
Servlet
容器初始化原理
SpringBoot
配置好了一个名为SpringSessionRepositoryFilter
的组件,该组件实现了Filter
接口,相当于该组件具备过滤器的功能,这个SpringSessionRepositoryFilter
将原生HttpSession
替换成我们Spring
的自定义的session
实现,在该实现中
配置组件
xxxxxxxxxx
public class Config {
public LettuceConnectionFactory connectionFactory(){
return new LettuceConnectionFactory();
}
}
组件RedisConnectionFactory
已经被SpringBoot
自动注入到IoC
容器中,
我们只需要在配置类或者启动类上添加注解@EnableRedisHttpSession
开启整合Redis
作为session
存储的功能
卧槽这么牛皮,Spring
用自定义session
取代了原来Tomcat
自带的HttpSession
,我们原来操作session
都是直接在控制器方法的参数列表指定HttpSession
,Spring
容器自动进行注入tomcat
原生的HttpSession
,我们使用Tomcat
原生的HttpSession
的API来操作Session
,现在Spring
使用自定义的SpringSessionRepositoryFilter
来替换Tomcat
原生的HttpSession
,我们在控制器方法注入HttpSession
时会自动注入SpringSessionRepositoryFilter
,而且SpringSessionRepositoryFilter
操作session
的api
和HttpSession
是一样的,这意味着我们可以不需要更改代码只需要配置SpringSession
就能丝滑使用SpringSession
替代Tomcat
的原生HttpSession
,原来对session
的操作一样生效,只是更换了方法的具体实现,把session
存到redis
中去了,下发cookie
的时候也将对应的作用域设置为了顶级域名,这个就是Java
中多态的思想,猜测HttpSession
是一个接口,Tomcat
的原生HttpSession
只是其中一个实现类
经过确认,确实如此,javax.servlet.http.HttpSession
是tomcat-embed-core:9.0.24
中的包下的一个接口,下面有多个实现类,Tomcat
默认使用的是StandardSession
做完以上的步骤,在执行操作session
的方法时仍然会报错SerializationException
,原因是在执行session
操作的时候无法进行序列化,这是因为我们要操作一个对象,将对象从当前服务器内存中保存到第三方存储介质中,这个过程涉及到IO过程,暂时认为所有的IO过程都要对内存中的对象进行序列化后才能传输,序列化的目的是将一个内存中的对象序列化为二进制流或者串,我这里先肤浅地认为只有二进制流或者串才能执行IO操作,具体的原理以前讲的很浅,后面看JavaIO
中有没有补充
核心就是要使用SpringSession
操作的数据因为要存储到第三方公共存储介质中,需要被操作的数据能够被序列化,SpringSession
默认使用JDK序列化,JDK的默认序列化需要被序列化的对象对应的类实现序列化接口才能将对应的对象进行序列化
注意RedisTemplate
的序列化实现好像使用的是注入序列化器来对缓存的数据专门进行序列化,那个好像没有专门要求被缓存的数据需要实现序列化接口
xxxxxxxxxx
package com.earl.common.vo;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author Earl
* @version 1.0.0
* @描述 用户基础信息
* @创建日期 2024/10/17
* @since 1.0.0
*/
public class UserBaseInfoVo implements Serializable {
/**
* id
*/
private Long id;
/**
* 会员等级id
*/
private Long levelId;
/**
* 用户名
*/
private String username;
/**
* 昵称
*/
private String nickname;
/**
* 头像
*/
private String header;
/**
* 性别
*/
private Integer gender;
/**
* 个性签名
*/
private String sign;
/**
* 积分
*/
private Integer integration;
/**
* 成长值
*/
private Integer growth;
/**
* 注册时间
*/
private Date createTime;
}
SpringSession
在第一次执行session
操作后,会给客户端下发一个名为SESSION
的cookie
,该SESSION
令牌会替代原来的JSESSIONID
令牌
注意默认情况下,SpringSession
设置的作用域也是当前二级域名;
同时,跨服务使用SpringSession
基于Redis
来共享session
,两个服务都需要引入spring-session-data-redis
❓:我们在mall-auth
和mall-product
两个服务都引入spring-session-data-redis
,并将在mall-auth
即请求域名为auth.earlmall.com
下发的cookie
的作用域手动改成earlmall.com
,但是此时我们在mall-product
中获取mall-auth
存入的session
数据仍然报错SerializationException
,
🔑:经过分析,这是因为我们在mall-product
中要从redis
中获取被序列化的数据,并且要将该数据反序列化为对象,结果在反序列化的过程中在mall-product
中找不到数据对应的类,即SerializationException
是由ClassNotFoundException
导致的,因此需要在分布式集群中进行session
共享的类最好放在common
包下;
同时数据对象在被序列化的时候,会在序列化结果中保存序列化前的对象对应的全限定类名,因此直接将对应的类向使用session
数据的目标服务拷贝一份也是不行的,因为全限定类名不同,直接放在common
包下最保险,而且缓存中的全限定类名与实际类名不同,反序列化也会失败,实体类的全限定类名发生了变化一定要清空缓存
❓:目前使用SpringSession
基于公共第三方Redis
存储session
数据解决了session
跨服务共享的问题,但是目前存在两个问题,第一个问题是下发cookie
的作用域仍然是对应下发cookie
请求的二级域名,无法解决子域session
共享问题,第二个问题是SpringSession
默认使用的是JDK
自带的序列化器,我们希望能够使用字符串序列化器将对象序列化为json
对象存储在Redis
中,这样也方便我们自己查看一些出问题的session
数据
🔑:我们可以通过自定义SpringSession
来解决该问题
主要是配置JSON
序列化器和修改下发cookie
的作用域
使用JSON
序列化器来序列化存储的数据的快速使用文档参考章节Samples and Guides示例和向导中的HttpSession with Redis JSON serialization,查看配置文件发现没有多余的配置,给的代码中的SessionConfig.java
是SpringSession使用JSON
序列化器的相关配置
更改下发cookie
的作用域需要使用CookieSerializer
,相关的参考文档在spring官网--Projects--SpringSession--Learn--API Documentation的最后一个Using CookieSerializer,这个文档有点东西啊,唉,怎么这么累啊;改作用域相当于需要自定义cookie
,通过暴露CookieSerializer
作为容器组件,这里没讲清楚CookieSerializer
系列化器与设置cookie
参数的关系,先记着吧,迟早要读文档的
SessionConfig.java
通过给容器中注入RedisSerializer
替换掉SpringSession
默认的序列化机制就能把原来使用JDK
默认的序列化器换成JSON
序列化器
使用了自定义JSON
序列化器,不使用JDK默认的序列化器,实体类可以无需实现Serializable
接口
xxxxxxxxxx
/*
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sample.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.security.jackson2.SecurityJackson2Modules;
/**
* @author jitendra on 3/3/16.
*/
// tag::class[]
public class SessionConfig implements BeanClassLoaderAware {
private ClassLoader loader;
public RedisSerializer<Object> springSessionDefaultRedisSerializer() {
return new GenericJackson2JsonRedisSerializer(objectMapper());
}
/**
* Customized {@link ObjectMapper} to add mix-in for class that doesn't have default
* constructors
* @return the {@link ObjectMapper} to use
*/
private ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModules(SecurityJackson2Modules.getModules(this.loader));
return mapper;
}
/*
* @see
* org.springframework.beans.factory.BeanClassLoaderAware#setBeanClassLoader(java.lang
* .ClassLoader)
*/
public void setBeanClassLoader(ClassLoader classLoader) {
this.loader = classLoader;
}
}
// end::class[]
实际上雷神没有像上面一样配置的这么麻烦,下面是雷神配置示例
弹幕说:序列化不生效的注意,新版本需要将bean的方法名改为springSessionDefaultRedisSerializer
xxxxxxxxxx
public class EarlmallSessionConfig{
public RedisSerializer<Object> springSessionDefaultRedisSerializer() {
return new GenericJackson2JsonRedisSerializer();
}
}
配置CookieSerializer
配置cookie
的最大有效时间[默认配置是Session
,即浏览器一关cookie就失效]
xxxxxxxxxx
cookieSerializer.setCookieMaxAge(int cookieMaxAge)
配置cookie
的作用域为顶级域名[默认配置是二级域名,我们手动扩大这个作用域来实现session跨域共享]
xxxxxxxxxx
serializer.setDomainNamePattern("^.+?\\.(\\w+\\.[a-z]+)$");
配置第一次使用session
默认下发cookie
的名字
xxxxxxxxxx
serializer.setCookieName("JSESSIONID");
这个待补充
xxxxxxxxxx
serializer.setCookiePath("/");
配置示例
SpringSession
基于Redis
的配置比较麻烦啊,因为SpringSession
要使用Redis
,但是有些服务不需要使用Redis
。所以需要使用Session
数据的服务就需要搭建SpringSession
和Redis
的环境,如果直接配置在Common
包下也会显得比较臃肿
xxxxxxxxxx
public class EarlmallSessionConfig{
public CookieSerializer cookieSerializer() {
DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer();
cookieSerializer.setCookieName("EARLSESSIONID");
cookieSerializer.setDomainNamePattern("earlmall.com");
return cookieSerializer;
}
public RedisSerializer<Object> springSessionDefaultRedisSerializer() {
return new GenericJackson2JsonRedisSerializer();
}
}
@EnableRedisHttpSession
的原理
使用SpringSession
除了能非侵入性实现分布式跨服务跨域session
共享外还考虑了很多边缘问题,比如只要session
中的数据被使用了就会自动为session
中对应的数据自动续期
@EnableRedisHttpSession
注意:在spring-session 2.2.1
版本的时候,放入的不是RedisOperationsSessionRepository
了,而是RedisIndexedSessionRepository
核心原理是@EnableRedisHttpSession
注解导入配置类RedisHttpSessionConfiguration
,该配置类给IoC
容器中注入了一个基于Redis
增删改查session
的持久化层组件RedisOperationsSessionRepository
,该配置类还继承自类SpringHttpSessionConfiguration
,在该父配置类中给容器注入了一个SessionRepositoryFilter
即session
存储过滤器,该组件的父类OncePerRequestFilter
实现了Filter
接口,该session存储过滤器在构造完成时就会将RedisOperationsSessionRepository
注入成为属性完成初始化,通过父类OncePerRequestFilter
实现的doFilter()
方法调用SessionRepositoryFilter
自己实现的doFilterInterval()
方法,在该方法中即前置过滤器链中将RedisOperationsSessionRepository
放到本次请求的请求域中,并将原生的HttpServletRequest
和HttpServletResponse
和应用上下文ServletContext
包装成相应的请求包装类SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper
和响应包装类SessionRepositoryFilter.SessionRepositoryResponseWrapper
,并在放行过滤器链的filterChain.doFilter(wrappedRequest, wrappedResponse);
方法中传参对应的请求和响应包装类,即后续的过滤器链和业务方法都是处理的请求和响应的包装类,我们在控制器方法中获取的HttpSession
组件本质是Spring
通过httpServletRequest.getSession()
方法获取的session
对象,当我们使用SpringSession
并使用注解@EnableRedisHttpSession
开启SpringSession
功能后就在前置过滤器链中将原生的HttpServletRequest
替换成了同样实现了HttpServletRequest
接口的包装类SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper
,实际上调用的getSession
方法是包装类实现的,在该方法中通过持久化层组件RedisOperationsSessionRepository
来实现对session
的基于Redis
的持久化操作并最终得到session
对象
持久化层SessionRepository
也是一个接口,我们使用的第三方存储介质是redis,而且导入的是基于redis
的SpringSession场景启动器spring-session-data-redis
,因此默认使用的是子接口FindByIndexNameSessionRepository
下的唯一实现类RedisOperationsSessionRepository
,此外还有直接实现类MapSessionRepository
使用内存来保存session;此外如果我们导入基于JDBC的SpringSession
场景启动器还可以使用数据库来保存session,如果导入基于MongoDB
的SpringSession
场景启动器我们也可以使用MongoDB
来保存session,也会有相应的数据库持久层
以上原理就是典型的装饰者模式的应用,实现代码的非侵入性修改
xxxxxxxxxx
RetentionPolicy.RUNTIME) (
ElementType.TYPE}) ({
RedisHttpSessionConfiguration.class})//1️⃣ @EnableRedisHttpSession注解为容器导入了配置类RedisHttpSessionConfiguration ({
public @interface EnableRedisHttpSession {
int maxInactiveIntervalInSeconds() default 1800;
String redisNamespace() default "spring:session";
RedisFlushMode redisFlushMode() default RedisFlushMode.ON_SAVE;
String cleanupCron() default "0 * * * * *";
}
1️⃣
public class RedisHttpSessionConfiguration extends SpringHttpSessionConfiguration implements BeanClassLoaderAware, EmbeddedValueResolverAware, ImportAware, SchedulingConfigurer {//1️⃣-1️⃣ 配置类RedisHttpSessionConfiguration继承了SpringHttpSessionConfiguration
...
//向容器中添加组件RedisOperationsSessionRepository,这个组件从名字上能看出是基于Redis操作Session的数据化持久层,相当于基于Redis操作session的DAO,这个就是session增删改查的封装类,这里面定义了大量类似于getSession获取session,findById查找session,deleteById删除session的操作redis的大量增删改查方法
public RedisOperationsSessionRepository sessionRepository() {
RedisTemplate<Object, Object> redisTemplate = this.createRedisTemplate();
RedisOperationsSessionRepository sessionRepository = new RedisOperationsSessionRepository(redisTemplate);
sessionRepository.setApplicationEventPublisher(this.applicationEventPublisher);
if (this.defaultRedisSerializer != null) {
sessionRepository.setDefaultSerializer(this.defaultRedisSerializer);
}
sessionRepository.setDefaultMaxInactiveInterval(this.maxInactiveIntervalInSeconds);
if (StringUtils.hasText(this.redisNamespace)) {
sessionRepository.setRedisKeyNamespace(this.redisNamespace);
}
sessionRepository.setRedisFlushMode(this.redisFlushMode);
int database = this.resolveDatabase();
sessionRepository.setDatabase(database);
return sessionRepository;
}
...
}
1️⃣-1️⃣
public class SpringHttpSessionConfiguration implements ApplicationContextAware {
...
//该@PostConstruct注解的意思是只要类SpringHttpSessionConfiguration调用构造方法实例化以后就会立即执行该方法,该方法的作用是初始化cookieSerializer对象,如果我们自定义了cookieSerializer就使用我们自定义的,如果没有自定义就使用默认的CookieSerializer
public void init() {
CookieSerializer cookieSerializer = this.cookieSerializer != null ? this.cookieSerializer : this.createDefaultCookieSerializer();
this.defaultHttpSessionIdResolver.setCookieSerializer(cookieSerializer);
}
//SessionEventHttpSessionListenerAdapter是监听器,监听session相关的各种事件,比如服务器停机session的序列化和反序列化
public SessionEventHttpSessionListenerAdapter sessionEventHttpSessionListenerAdapter() {
return new SessionEventHttpSessionListenerAdapter(this.httpSessionListeners);
}
//1️⃣-1️⃣-1️⃣ 给容器中注入一个SessionRepositoryFilter即session存储过滤器,该过滤器实现了Servlet中的Filter接口,每个请求都会经过该过滤器进行相应请求处理
public <S extends Session> SessionRepositoryFilter<? extends Session> springSessionRepositoryFilter(SessionRepository<S> sessionRepository) {
SessionRepositoryFilter<S> sessionRepositoryFilter = new SessionRepositoryFilter(sessionRepository);
sessionRepositoryFilter.setServletContext(this.servletContext);
sessionRepositoryFilter.setHttpSessionIdResolver(this.httpSessionIdResolver);
return sessionRepositoryFilter;
}
...
}
1️⃣-1️⃣-1️⃣
-2147483598) (
//SessionRepositoryFilter继承了OncePerRequestFilter,OncePerRequestFilter实现了Filter接口,在OncePerRequestFilter中实现的doFilter方法中调用了抽象方法doFilterInternal,该方法被子类SessionRepositoryFilter实现,SpringSession的核心就是这个doFilterInternal方法
public class SessionRepositoryFilter<S extends Session> extends OncePerRequestFilter {
...
public SessionRepositoryFilter(SessionRepository<S> sessionRepository) {//sessionRepositoryFilter在构造的时候就自动注入上述操作session的持久化组件RedisOperationsSessionRepository
if (sessionRepository == null) {
throw new IllegalArgumentException("sessionRepository cannot be null");
} else {
this.sessionRepository = sessionRepository;
}
}
...
//SpringSession的核心原理就是该方法,即SessionRepositoryFilter中的doFilterInternal方法
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
request.setAttribute(SESSION_REPOSITORY_ATTR, this.sessionRepository);//给请求域中存放操作session的持久层组件sessionRepository即此前的RedisOperationsSessionRepository,给请求域存在的数据可以在当次请求处理期间被到处共享
SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper wrappedRequest = new SessionRepositoryFilter.SessionRepositoryRequestWrapper(request, response, this.servletContext);//将原生的请求、响应和servlet应用上下文包装成SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper即一个包装请求对象,这是一个典型的装饰者模式
SessionRepositoryFilter.SessionRepositoryResponseWrapper wrappedResponse = new SessionRepositoryFilter.SessionRepositoryResponseWrapper(wrappedRequest, response);//将包装后的请求对象和原生的响应对象包装成一个包装响应对象SessionRepositoryFilter.SessionRepositoryResponseWrapper
try {
filterChain.doFilter(wrappedRequest, wrappedResponse);//1️⃣-1️⃣-1️⃣-1️⃣ 特别注意此处调用filterChain的doFilter方法放行的时候,传参不是原生的request和response,而是包装过的包装请求对象wrappedRequest和包装响应对象,整个过滤器链执行到此处,此前的过滤器链都是对原生的请求和响应进行处理,而后面的过滤器链包括业务方法都是对包装后的请求和响应进行处理,即将包装后的请求和响应对象全部应用到了整个执行链,这里直接跳过中间过程到控制器方法对包装请求和响应的处理
} finally {
wrappedRequest.commitSession();
}
}
...
private final class SessionRepositoryRequestWrapper extends HttpServletRequestWrapper {
...
private final class SessionCommittingRequestDispatcher implements RequestDispatcher {
...
}
private final class HttpSessionWrapper extends HttpSessionAdapter<S> {
...
}
}
private final class SessionRepositoryResponseWrapper extends OnCommittedResponseWrapper {
...
}
}
1️⃣-1️⃣-1️⃣-1️⃣
"/weibo/success") (
public String weiboAuthSuccessThen(String code,
HttpSession session,
RedirectAttributes attributes,
HttpServletRequest request){//实际上Spring向控制器方法中自动注入的HttpSession就是httpServletRequest.getSession(),即我们要获取session就会从请求中去获取session,但是我们的请求已经在过滤器链中被包装成了SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper,因此Spring在使用SpringSession的情况下调用request.getSession()获取session实际上调用的包装类SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper的getSession方法
try {
UserBaseInfoVo user = authService.weiboAuthSuccessThen(code);
HttpSession session1 = request.getSession();//1️⃣-1️⃣-1️⃣-1️⃣-1️⃣ 这里在使用SpringSession的时候实际调用的是wrappedRequest.getSession()
session.setAttribute("user",user);
return "redirect:http://earlmall.com";
}catch (RRException e){
attributes.addFlashAttribute("error",e.getCode()+":"+e.getMsg());
return "redirect:http://auth.earlmall.com/login.html";
}
}
1️⃣-1️⃣-1️⃣-1️⃣-1️⃣
private final class SessionRepositoryRequestWrapper extends HttpServletRequestWrapper {//SessionRepositoryRequestWrapper继承了HttpServletRequestWrapper,HttpServletRequestWrapper实现了HttpServletRequest
...
//这个就是SessionRepositoryRequestWrapper的getSession方法,
public SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper.HttpSessionWrapper getSession(boolean create) {
SessionRepositoryFilter<S>.SessionRepositoryRequestWrapper.HttpSessionWrapper currentSession = this.getCurrentSession();//先通过getCurrentSession()获取当前的session即currentSession,如果能获取到则直接返回,猜测这里是懒惰初始化,第一次使用的时候创建对应的对象
if (currentSession != null) {
return currentSession;
} else {
//如果此前没有创建过session,即currentSession获取不到,则会调用getRequestedSession()方法
S requestedSession = this.getRequestedSession();
if (requestedSession != null) {
if (this.getAttribute(SessionRepositoryFilter.INVALID_SESSION_ID_ATTR) == null) {
requestedSession.setLastAccessedTime(Instant.now());
this.requestedSessionIdValid = true;
currentSession = new SessionRepositoryFilter.SessionRepositoryRequestWrapper.HttpSessionWrapper(requestedSession, this.getServletContext());
currentSession.setNew(false);
this.setCurrentSession(currentSession);
return currentSession;
}
} else {
if (SessionRepositoryFilter.SESSION_LOGGER.isDebugEnabled()) {
SessionRepositoryFilter.SESSION_LOGGER.debug("No session found by id: Caching result for getSession(false) for this HttpServletRequest.");
}
this.setAttribute(SessionRepositoryFilter.INVALID_SESSION_ID_ATTR, "true");
}
if (!create) {
return null;
} else {
if (SessionRepositoryFilter.SESSION_LOGGER.isDebugEnabled()) {
SessionRepositoryFilter.SESSION_LOGGER.debug("A new session was created. To help you troubleshoot where the session was created we provided a StackTrace (this is not an error). You can prevent this from appearing by disabling DEBUG logging for " + SessionRepositoryFilter.SESSION_LOGGER_NAME, new RuntimeException("For debugging purposes only (not an error)"));
}
S session = SessionRepositoryFilter.this.sessionRepository.createSession();
session.setLastAccessedTime(Instant.now());
currentSession = new SessionRepositoryFilter.SessionRepositoryRequestWrapper.HttpSessionWrapper(session, this.getServletContext());
this.setCurrentSession(currentSession);
return currentSession;
}
}
}
...
private S getRequestedSession() {
if (!this.requestedSessionCached) {
List<String> sessionIds = SessionRepositoryFilter.this.httpSessionIdResolver.resolveSessionIds(this);
Iterator var2 = sessionIds.iterator();
while(var2.hasNext()) {
String sessionId = (String)var2.next();
if (this.requestedSessionId == null) {
this.requestedSessionId = sessionId;
}
S session = SessionRepositoryFilter.this.sessionRepository.findById(sessionId);//这里调用sessionRepository即RedisOperationsSessionRepository,因此SpringSession对session的增删改查全部是通过Redis完成的
if (session != null) {
this.requestedSession = session;
this.requestedSessionId = sessionId;
break;
}
}
this.requestedSessionCached = true;
}
return this.requestedSession;
}
...
}
常见支付方式:支付宝支付
站点目录,找不到的直接右上角搜索,阿里是这样的,文档非常复杂而且经常变动,给开发者带来很大的困扰
接入流程
正常的接入流程要按照接入准备的流程接入准备文档]
1️⃣:创建应用[创建一个已经上线的应用]
2️⃣:在应用列表下添加要接入的功能
3️⃣:设置接口加密方式、IP白名单和网关等开发设置
4️⃣:接入功能签约[需要营业执照]
沙箱环境
沙箱环境是一个支付宝内部的安全环境,模拟了所有的支付宝开放平台的功能,正式接入需要提供营业执照和已经上线的业务进行审核,应用没有上线前可以使用沙箱环境来进行调试,相当于支付宝给每个开发人员创建的一个应用
配置沙箱环境
通过沙箱环境文档链接https://opendocs.alipay.com/common/02kkv7
搜索沙箱控制台点击进入沙箱控制台,里面可以看到当前开发人员沙箱环境对应的配置信息
配置参数
APPID
:沙箱环境的应用ID
支付宝网关地址gatewayUrl
:支付需要调用的支付宝网关接口地址,实际生产项目需要将沙箱网关切换为支付宝的线上网关,支付宝沙箱网关https://openapi-sandbox.dl.alipaydev.com/gateway.do
会在alipay
后面加一个dev
merchant_private_key
:商户私钥,也叫应用私钥,现在是直接支付宝开放平台线上生成好的,无需像以前一样下载支付宝开放平台开发助手来生成,老师那时候还是下载该应用手动生成的,以前应用公钥在本地生成以后还要上传到支付宝开放平台,现在也不需要管了
alipay_public_key
:支付宝公钥,线上支付宝开放平台直接生成好的
沙箱账号:这是一个付款账号,支付宝支付需要使用该沙箱账号进行支付,该沙箱账号有100w余额,就是支付环境的买家测试账号,还可以给沙箱账号充值
买家账号:该沙箱环境只能使用该买家账号登录支付宝进行付款
登录密码:买家支付宝账户的登录密码
支付密码:买家支付宝账户的支付密码
页面跳转地址
return_url
:页面跳转同步通知页面路径,支付宝支付成功以后用户要跳转的页面地址,DEMO中的地址为http://localhost:8080/alipay.trade.page.pay-JAVA-UTF-8/return_url.jsp
notify_url
:服务器异步通知页面路径,支付宝支付成功以后会每隔几秒给服务器该接口地址发送一条支付成功的消息来通知服务器用户支付成功了,服务器接收到消息可以根据支付信息对订单进行后续处理http://localhost:8080/alipay.trade.page.pay-JAVA-UTF-8/notify_url.jsp
设置全局的编码格式为UTF-8,避免因为文件编码格式错误导致支付宝返回页总是报错签名错误等各种错误
我们的服务希望被世界上任何一个人都能访问到,正常的实现方式是为服务分配一个公网IP,给公网IP绑定一个域名earlmall.com
,并给域名备案,只要任何一个人访问earlmall.com
,公网上的域名解析器DNS服务器通过域名获取到服务器的公网IP地址;但是这种方式实现起来比较麻烦
ping jd.com
,我们发现返回数据中有京东的公网IP地址111.12.149.108
我们可以访问公网,但是公网是无法直接访问到我们的电脑的,我们和别人聊QQ把消息发送给别人电脑实际上是QQ服务器与别人电脑上的软件建立起的连接,是通过QQ服务器中转的,即还是客户端请求服务端响应那一套,我们无法直接和QQ用户的电脑直接建立连接
我们可以使用内网穿透技术让世界上的任何一个人都访问到我们的当前电脑,
原理
内网穿透服务商会要求在我们的电脑上下载一个服务商软件,该软件可以和服务商服务器建立长连接,内网穿透服务商会为我们电脑上的服务商软件分配一个随机的无需备案的域名[这个域名可能很丑很难看]
临时分配的域名一般是内网穿透服务商的二级或者三级域名,只要内网穿透服务商的顶级域名备了案,子域名无需再备案
别人访问临时域名如haha.hello.com
会先到达内网穿透服务商,服务商根据域名找到分配对应域名的服务软件,通过软件与服务器间建立的长连接通道将请求直接转发到我们的电脑
同理其他电脑也可以通过这种方式让我们能正常访问其他的电脑
最终效果就是使用内网穿透服务商分配的域名实现在公网上通过域名访问我们的主机的效果
使用场景
开发测试,比如微信和支付宝的开发调试
智慧互联,我们在外面无法直接通过公网访问到我们的家用电脑和智能设备,但是我们可以通过内网穿透服务商给路由器分配一个域名,我们可以在任何地方通过该域名找到路由器并给路由器发送命令控制内网中的设备,做智慧家庭云系统
私有云,家庭系统中添加一个远程访问的私有存储数据设备
使用流程
下载服务商软件,这里用natapp
演示
电脑网站支付SDK
使用SDK前最好在本地搭建运行一下支付宝电脑网站支付SDK提供的DEMO,方便熟悉支付的整个流程
DEMO演示
下载DEMO压缩包alipay.trade.page.pay-JAVA-UTF-8.zip
注意该项目解压以后是一个典型的Eclipse应用,只是一个普通工程,不是Maven工程
在Eclipse左侧菜单栏右键--Import--General/Existing Projects into Workspace--Browse选择刚解压的文件夹--选中要引入的项目--设置Options为Copy projects into workspace--Finish将项目导入Eclipse
该项目运行需要Tomcat服务器,项目导入以后会在Servers
界面提示No servers are available
,选择Tomcat来创建一个server服务器,老师选择的是Tomcat 8.5
项目代码
src
目录下只有一个配置类AliPayConfig
,使用支付宝电脑网站支付需要在AliPayConfig
中做很多的配置,只有清除每一个配置的含义才能把项目搭建起来,配置如下
xxxxxxxxxx
package com.alipay.config;
import java.io.FileWriter;
import java.io.IOException;
/* *
*类名:AlipayConfig
*功能:基础配置类
*详细:设置帐户有关信息及返回路径
*修改日期:2017-04-05
*说明:
*以下代码只是为了方便商户测试而提供的样例代码,商户可以根据自己网站的需要,按照技术文档编写,并非一定要使用该代码。
*该代码仅供学习和研究支付宝接口使用,只是提供一个参考。
*/
public class AlipayConfig {
//↓↓↓↓↓↓↓↓↓↓请在这里配置您的基本信息↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
// 应用ID,您的APPID,收款账号既是您的APPID对应支付宝账号
public static String app_id = "";
// 商户私钥,您的PKCS8格式RSA2私钥
public static String merchant_private_key = "";
// 支付宝公钥,查看地址:https://openhome.alipay.com/platform/keyManage.htm 对应APPID下的支付宝公钥。
public static String alipay_public_key = "";
// 服务器异步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
public static String notify_url = "http://工程公网访问地址/alipay.trade.page.pay-JAVA-UTF-8/notify_url.jsp";
// 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
public static String return_url = "http://工程公网访问地址/alipay.trade.page.pay-JAVA-UTF-8/return_url.jsp";
// 签名方式
public static String sign_type = "RSA2";
// 字符编码格式
public static String charset = "utf-8";
// 支付宝网关
public static String gatewayUrl = "https://openapi.alipay.com/gateway.do";
// 支付宝网关
public static String log_path = "C:\\";
//↑↑↑↑↑↑↑↑↑↑请在这里配置您的基本信息↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
/**
* 写日志,方便测试(看网站需求,也可以改成把记录存入数据库)
* @param sWord 要写入日志里的文本内容
*/
public static void logResult(String sWord) {
FileWriter writer = null;
try {
writer = new FileWriter(log_path + "alipay_log_" + System.currentTimeMillis()+".txt");
writer.write(sWord);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
该DEMO的所有代码都在WebContent
目录下,所有的代码包括前端代码都被统一放在JSP页面中了,要做支付直接把对应JSP的代码复制粘贴处理成我们自己的页面即可
核心概念
身份:
app_id
:使用支付宝需要创建应用,创建的应用在管理中心的网页和移动应用列表,在应用列表中可以看到每个应用列表的APPID
,在每个应用详情的图标下方也会显示应用的APPID
,作为当前应用的唯一标识
加密:服务器和支付宝之间会传递金融数据,对网络传输的加密要求比较高,我们在配置文件中配置的就是非对称加密算法的客户端的请求加密密钥和响应解密密钥
🔎:根据非对称加密算法支付宝和一个客户端之间总共有两对即四把密钥,RSA算法一次生成一对密钥,一把公钥、一把私钥;
加密过程:商户保存商户公钥,发起请求时结合请求参数和商户请求标识使用商户公钥加密生成一个签名,该签名只要请求参数发生变化签名就会发生变化[比如一个藏在签名中的本次操作标识拼接请求参数整体做MD5加密用商户公钥做成签名,网络传输过程中不法组织没有商户私钥获取不到签名中的本次操作标识无法伪造签名,也无法使用以往签名替代本次签名,我们就可以通过签名来验证用户参数是否发生过篡改,即使不法组织直接用请求参数密文替换当前的请求参数对应密文签名验证也无法通过],服务器接收数据验证签名后处理业务,业务处理完成使用支付宝私钥结合本次操作标识和响应参数生成签名,该签名可以在网络传输过程被泄露的支付宝公钥解密[如果能被篡改可以根据客户端的逻辑比如操作失败用户可能还会继续选择重试],但是不法组织即使篡改了数据也因为没有私钥无法为篡改后的数据加密,也无法用以往交易的唯一标识结合错误响应结果的签名来替换本次的响应密文,因此请求和响应数据都是安全的,响应数据到达客户端以后使用支付宝公钥来解密响应密文
这里老师的逻辑是有漏洞的,因为签名也是可以替换的,除非服务器和客户端在发起请求以前有公共的唯一标识,这个唯一标识第三方还无法破解才能保证签名对应的密文不会被替换[这里结合JWT的思路可能会更清晰,使用随机数是不行的,因为随机数就算存在签名中无法被获取,但是签名也能整体被替换,只要把以前的请求参数和签名密文整体换掉就验不出来,除非交易前客户端和服务端都保存了标识本次交易的唯一标识并用该标识生成签名]
注意这里有歧义,支付宝的加密是私钥进行加密,公钥进行解密;不是保存在生产者中的是私钥
注意只有支付宝私钥是没有人能看到的,其他的所有3把密钥商户都能看到,反正只要不知道支付宝私钥,整个通信过程就是安全的
merchant_private_key
:商户私钥,也叫应用私钥,现在是直接支付宝开放平台线上生成好的,无需像以前一样下载支付宝开放平台开发助手来生成,老师那时候还是下载该应用手动生成的,以前应用公钥在本地生成以后还要上传到支付宝开放平台,现在也不需要管了
alipay_public_key
:支付宝公钥,线上支付宝开放平台直接生成好的
沙箱账号:这是一个付款账号,支付宝支付需要使用该沙箱账号进行支付,该沙箱账号有100w余额,就是支付环境的买家测试账号,还可以给沙箱账号充值
买家账号:该沙箱环境只能使用该买家账号登录支付宝进行付款
登录密码:买家支付宝账户的登录密码
支付密码:买家支付宝账户的支付密码
页面跳转地址
return_url
:页面跳转同步通知页面路径,支付宝支付成功以后用户要跳转的页面地址,DEMO中的地址为http://localhost:8080/alipay.trade.page.pay-JAVA-UTF-8/return_url.jsp
notify_url
:服务器异步通知页面路径,支付宝支付成功以后会每隔几秒给服务器该接口地址发送一条支付成功的消息来通知服务器用户支付成功了,服务器接收到消息可以根据支付信息对订单进行后续处理http://localhost:8080/alipay.trade.page.pay-JAVA-UTF-8/notify_url.jsp
设置全局的编码格式为UTF-8,避免因为文件编码格式错误导致支付宝返回页总是报错签名错误等各种错误
整合支付宝支付功能
页面跳转
点击支付宝付款发起POST请求跳转地址alipay.trade.page.pay.jsp
alipay.trade.page.pay.jsp
xxxxxxxxxx
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>付款</title>
</head>
<%page language="java" contentType="text/html; charset=utf-8" pageEncoding="utf-8"%>
<%page import="com.alipay.config.*"%>
<%page import="com.alipay.api.*"%>
<%page import="com.alipay.api.request.*"%>
<%
//使用配置类AlipayConfig中配置的各种配置构造初始化的AlipayClient对象
AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.gatewayUrl, AlipayConfig.app_id, AlipayConfig.merchant_private_key, "json", AlipayConfig.charset, AlipayConfig.alipay_public_key, AlipayConfig.sign_type);
//构造支付请求并设置页面跳转参数
AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
alipayRequest.setReturnUrl(AlipayConfig.return_url);
alipayRequest.setNotifyUrl(AlipayConfig.notify_url);
//从index.jsp的名为alipayment的标签发起的请求中获取订单号、付款金额、订单名称、商品描述,并将这些参数设置到bizContent参数中
//商户订单号,商户网站订单系统中唯一订单号,必填
String out_trade_no = new String(request.getParameter("WIDout_trade_no").getBytes("ISO-8859-1"),"UTF-8");
//付款金额,必填
String total_amount = new String(request.getParameter("WIDtotal_amount").getBytes("ISO-8859-1"),"UTF-8");
//订单名称,必填
String subject = new String(request.getParameter("WIDsubject").getBytes("ISO-8859-1"),"UTF-8");
//商品描述,可空
String body = new String(request.getParameter("WIDbody").getBytes("ISO-8859-1"),"UTF-8");
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");
//若想给BizContent增加其他可选请求参数,以增加自定义超时时间参数timeout_express来举例说明
//alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
// + "\"total_amount\":\""+ total_amount +"\","
// + "\"subject\":\""+ subject +"\","
// + "\"body\":\""+ body +"\","
// + "\"timeout_express\":\"10m\","
// + "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");
//请求参数可查阅【电脑网站支付的API文档-alipay.trade.page.pay-请求参数】章节
//使用AlipayClient客户端发起请求获取响应体数据并输出
String result = alipayClient.pageExecute(alipayRequest).getBody();
//这个输出是直接输出到客户端页面
out.println(result);
%>
<body>
</body>
</html>
整合流程
1️⃣:引入支付宝支付SDK,即com.alipay.sdk:alipay-sdk-java
,版本与老师保持一致
xxxxxxxxxx
<!--导入com.alipay.sdk:alipay-sdk-java支付宝的电脑网站支付开发工具包SDK-->
<!-- https://mvnrepository.com/artifact/com.alipay.sdk/alipay-sdk-java -->
<dependency>
<groupId>com.alipay.sdk</groupId>
<artifactId>alipay-sdk-java</artifactId>
<version>4.9.28.ALL</version>
</dependency>
2️⃣:将DEMO中的支付请求发起逻辑代码和支付宝支付配置类AliPayConfig
封装成一个工具类AlipayTemplate
支付宝的响应内容即result
的内容如下
响应的是一个表单,该表单封装了用户支付的所有数据,用户浏览器只要一收到该表单,就会执行脚本document.forms[0].sumbit()
直接提交该表单给支付宝,支付宝会直接响应给用户对应的收银页面
因此我们直接将支付宝返回的响应体直接返回给用户,用户客户端就会直接提交表单给支付宝,然后支付宝直接向用户客户端响应收银页面
要特别注意,因为我们响应给用户的数据是支付宝发过来的表单数据,不是一个json
数据,@ResponseBody
是将对象转换成json
格式数据响应并更改响应头中的数据类型为application/json
,因此在使用@ResponseBody
响应字符串对象的同时我们还要通过@GetMapping(value="/order/pay",produces="text/html")
或者@GetMapping(value ="/order/pay", produces = MediaType.TEXT_HTML_VALUE)
来指定响应数据的类型为text/html
,这样浏览器就不会将数据作为application/json
数据展示,而是直接作为HTML页面开始渲染
xxxxxxxxxx
<form name="punchout_form" method="post" action="https://openapi.alipaydev.com/gateway.do?charset=utf-8&method=alipay.trade.page.pay&sign=gvzA31MGat6of4f49TEAEEhpBmhwSkO699g3imrIuM3qogRfGzpNS3T5JyX9JhxF%2B1BDdZ0%2F3mR3K%2FR2QkhoCpATRxaSk9JWbPoZxsZxLvlb%2F9Ld%2BofXN4FmXfD82es%2BkFmlnxQ6BosznJNtV6eh4hdcoSyAt1YtXpGU%2ByOZKWImoZlrD1vuCY6mN9KyehfIy6hq531oUYVixn81%2FlXRnZ6Ffq%2BXaKMWHhOICeWgXRm25c4AJDnHmwmijCYr6%2FG%2F2HyQbY%2FpjLfmD2EQn7CUSIAOOpOYUQQCM1rXtINYZPoPZCdVOqhdGphi0IbLw2VQRZV%2FXxlRYfmJieFbBHrxJg%3D%3D&return_url=http%3A%2F%2Forder.earlmall.com%2Fpaid%2Fnotify¬ify_url=http%3A%2F%2Fuser.earlmall.com%2Fuser%2Forder%2Flist.html&version=1.0&app_id=9021000142643535&sign_type=RSA2×tamp=2024-12-05+23%3A54%3A18&alipay_sdk=alipay-sdk-java-dynamicVersionNo&format=json">
<input type="hidden" name="biz_content" value="{"out_trade_no":"202412052346288201864697859702534145","total_amount":"12004.00","subject":"华为 HUAWEI Mate60 Pro 星河银 128G","body":"颜色: 星河银;内存: 128G","product_code":"FAST_INSTANT_TRADE_PAY","time_expire":"2024-12-05 08:16:29"}">
<input type="submit" value="立即支付" style="display:none" >
</form>
<script>document.forms[0].submit();</script>
[AlipayTemplate]
xxxxxxxxxx
package com.earl.mall.order.config;
import com.alipay.api.AlipayApiException;
import com.alipay.api.AlipayClient;
import com.alipay.api.DefaultAlipayClient;
import com.alipay.api.internal.util.AlipaySignature;
import com.alipay.api.request.AlipayTradePagePayRequest;
import com.earl.mall.order.vo.AlipayVo;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* @author Earl
* @version 1.0.0
* @描述 通过注解`@ConfigurationProperties(prefix = "alipay")`将该组件中的属性都绑定到配置文件前缀alipay上
* @创建日期 2024/12/04
* @since 1.0.0
*/
prefix = "alipay") (
public class AlipayTemplate {
/**
* 在支付宝创建的应用的id
*/
private String app_id = "9021000142643535";
/**
* 商户私钥,您的PKCS8格式RSA2私钥
*/
private String merchant_private_key = "XXX";
/**
* 支付宝公钥,查看地址:https://openhome.alipay.com/platform/keyManage.htm 对应APPID下的支付宝公钥。
*/
private String alipay_public_key = "XXX";
/**
* 服务器[异步通知]页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
*/
private String notify_url="http://b4qi64.natappfree.cc/paid/notify";
/**
* 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问同步通知,支付成功,一般跳转到成功页
*/
private String return_url="http://user.earlmall.com/user/order/list.html";
/**
* 签名方式
*/
private String sign_type = "RSA2";
/**
* 字符编码格式
*/
private String charset = "utf-8";
/**
* 支付宝网关; https://openapi.alipaydev.com/gateway.do
*/
private String gatewayUrl = "https://openapi-sandbox.dl.alipaydev.com/gateway.do";
public String pay(AlipayVo vo) throws AlipayApiException {
//AlipayClient alipayClient = new DefaultAlipayClient(AlipayTemplate.gatewayUrl, AlipayTemplate.app_id, AlipayTemplate.merchant_private_key, "json", AlipayTemplate.charset, AlipayTemplate.alipay_public_key, AlipayTemplate.sign_type);
//1、根据支付宝的配置生成一个支付客户端
AlipayClient alipayClient = new DefaultAlipayClient(gatewayUrl,
app_id, merchant_private_key, "json",
charset, alipay_public_key, sign_type);
//2、创建一个支付请求 //设置请求参数
AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
alipayRequest.setReturnUrl(return_url);
alipayRequest.setNotifyUrl(notify_url);
//商户订单号,商户网站订单系统中唯一订单号,必填
String out_trade_no = vo.getOut_trade_no();
//付款金额,必填
String total_amount = vo.getTotal_amount();
//订单名称,必填
String subject = vo.getSubject();
//商品描述,可空
String body = vo.getBody();
//绝对关单时间
String time_expire = vo.getTime_expire();
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\","
+ "\"time_expire\":\""+ time_expire +"\"}");
String result = alipayClient.pageExecute(alipayRequest).getBody();
//会收到支付宝的响应,响应的是一个页面,只要浏览器显示这个页面,就会自动来到支付宝的收银台页面
//System.out.println("支付宝的响应:"+result);
return result;
}
/**
* @param request
* @return boolean
* @描述 验证支付宝异步回调的支付宝签名
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/06
* @since 1.0.0
*/
public boolean verifySignature(HttpServletRequest request) throws AlipayApiException {
//获取支付宝POST过来反馈信息,将支付宝返回的所有请求参数都封装到一个Map集合
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用,注意这行代码只能在乱码的时候再执行
//valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
//调用alipaySignature.rsaCheckV1()方法来进行验签,这里面使用了AlipayConfig配置文件中的属性,我们是单独封装了一个AlipayTemplate,注意进行替换,验签会返回验证的结果,如果验签成功signVerified为true说明这是支付宝发回来的数据可以执行业务方法,如果验签失败说明signVerified为false说明这个数据有问题,不是支付宝返回的数据不能执行业务方法
return AlipaySignature.rsaCheckV1(params, alipay_public_key, charset, sign_type); //调用SDK验证签名
}
}
3️⃣:准备VO类封装支付参数
xxxxxxxxxx
/**
* @author Earl
* @version 1.0.0
* @描述 支付宝收银页参数
* @创建日期 2024/12/05
* @since 1.0.0
*/
public class AlipayVo {
/**
* 商户订单号 必填
*/
private String out_trade_no;
/**
* 订单名称 必填
*/
private String subject;
/**
* 付款金额 必填
*/
private String total_amount;
/**
* 商品描述 可空
*/
private String body;
/**
* 绝对关单时间
*/
private String time_expire;
}
4️⃣:在支付页给支付宝图片设置一个超链接,超链接的跳转地址th:href="'http://order.earlmall.com/order/pay?orderSn='+${pay.orderSn}"
xxxxxxxxxx
<a th:href="'order/alipay/'+${session.pay.order.orderSn}">
<img src="/static/order/pay/img/zhifubao.png" style="weight:auto;height:30px;" alt="">支付宝
</a>
5️⃣:处理支付逻辑
支付只需要调用alipayTemplate.pay(payVo)
传参PayVo
封装的支付参数即可,需要传参订单号,订单备注、订单主题[订单主题会在用户付款页显示]、订单金额;前端传参只传了订单号,我们希望从数据库中查询出对应的订单以上信息
实际上这里还应该校验订单的支付状态,支付过了就不允许支付了,避免用户重复支付
注意支付宝要求支付金额必须精确为两位小数,小数位多了少了都会直接报错,通过bigDecimal.setScale(2)
设置小数位数为2位,取两位小数时我们可以通过bigDecimal.setScale(2,BigDecimal.ROUND_UP)
让金额最后一位向上取值,而且注意支付宝要求的支付金额数据类型是String
类型,可以通过bigDecimal.toString()
将BigDecimal
类型数据转换成String
类型数据
老师的订单主题是直接拿订单项列表的第一个的商品sku
名称作为支付订单的主题,这有点low
订单备注老师也是直接拿第一个订单项的销售属性来糊弄的
响应的是一个表单,该表单封装了用户支付的所有数据,用户浏览器只要一收到该表单,就会执行脚本document.forms[0].sumbit()
直接提交该表单给支付宝,支付宝会直接响应给用户对应的收银页面
因此我们直接将支付宝返回的响应体直接返回给用户,用户客户端就会直接提交表单给支付宝,然后支付宝直接向用户客户端响应收银页面
要特别注意,因为我们响应给用户的数据是支付宝发过来的表单数据,不是一个json
数据,@ResponseBody
是将对象转换成json
格式数据响应并更改响应头中的数据类型为application/json
,因此在使用@ResponseBody
响应字符串对象的同时我们还要通过@GetMapping(value="/order/pay",produces="text/html")
或者@GetMapping(value ="/order/pay", produces = MediaType.TEXT_HTML_VALUE)
来指定响应数据的类型为text/html
,这样浏览器就不会将数据作为application/json
数据展示,而是直接作为HTML页面开始渲染
我们模拟用户支付的时候必须使用沙箱账户支付
电商网站用户成功支付以后已经跳转用户的支付列表页,也就是上面用户同步通知页return_url
[控制器方法]
xxxxxxxxxx
/**
* @param orderSn
* @return {@link String }
* @描述 验证订单状态并调用支付宝接口响应收银页
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/05
* @since 1.0.0
*/
value = "/order/alipay/{orderSn}",produces = MediaType.TEXT_HTML_VALUE) (
public String orderAlipay( ("orderSn") String orderSn){
String result = orderWebService.orderAlipay(orderSn);
return result;
}
[业务实现类]
xxxxxxxxxx
/**
* @param orderSn
* @return {@link String }
* @描述 检查订单状态并调用支付宝接口响应收银页
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/05
* @since 1.0.0
*/
public String orderAlipay(String orderSn) throws RRException{
//1. 检查订单状态
OrderEntity order = orderService.getOrderByOrderSn(orderSn);
//订单状态不是未支付直接抛出异常
if(order.getStatus()!=0){
throw new RRException(StatusCode.ORDER_PAY_STATUS_EXCEPTION.getMsg()+orderSn+OrderConstant.OrderStatus.getMsgByCode(order.getStatus()),
StatusCode.ORDER_PAY_STATUS_EXCEPTION.getCode());
}
//封装支付参数
AlipayVo alipayParams = new AlipayVo();
alipayParams.setOut_trade_no(orderSn);
alipayParams.setTotal_amount(order.getPayAmount().setScale(2, BigDecimal.ROUND_UP).toString());
List<OrderItemEntity> orderItems = orderItemService.list(new QueryWrapper<OrderItemEntity>().eq("order_sn", orderSn));
alipayParams.setSubject(orderItems.get(0).getSkuName());
alipayParams.setBody(orderItems.get(0).getSkuAttrsVals());
//格式化关单绝对时间
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
alipayParams.setTime_expire(order.getTimeExpire().format(dtf));
String result = null;
//调用支付方法发起支付请求
try {
result = alipayTemplate.pay(alipayParams);
} catch (AlipayApiException e) {
e.printStackTrace();
}
return result;
}
6️⃣:支付成功跳转
用户支付成功我们希望跳转用户的订单列表页,我们把订单列表页做在会员系统中,设置支付的return_url=http://user.earlmall.com/user/order/list.html
,当用户支付成功以后支付宝会重定向回商户的该界面,并且会在该地址后面拼接用户支付的订单号以及支付的签名,我们可以在响应该重定向页面的同时拿着订单号和支付宝签名验证用户支付状态并修改订单状态,主要是验证支付宝签名,只有支付宝签名正确的情况下才说明订单号是正确的,用户确实完成了支付,但是我们有更好的实现方法[如果靠用户浏览器重定向更改订单状态如果用户支付以后立即关闭浏览器我们就收不到用户浏览器发起的重定向请求,因此这种方式更改订单状态不可靠]
7️⃣:异步通知更改订单状态
用户成功支付以后支付宝会每隔几秒就给我们提供的服务器异步通知路径notify_url
发起POST请求,将支付结果作为参数通知商户,所有的参数和介绍都在https://opendocs.alipay.com/open/270/105902?pathHash=d5cd617e
程序执行完毕本次请求必须给支付宝响应字符串success
这七个字符[不能响应页面,只能响应字符串对象],否则支付宝会不断发起该请求,会在25小时内发起8次请求,发送时间间隔分别为4m
、10m
、10m
、1h
、2h
、6h
、15h
,这种事务就是跨系统间的分布式事务,支付宝负责最大努力通知我们支付成功,我们用软一致性还保证数据的最终一致性,支付宝采取商户手动应答的策略来确保消息不容易丢失,这是最大努力通知型方案,不是一定保证最终一致性的方案
因为支付宝要访问我们的接口,因此我们必须保证自己的接口能在外网访问,因此必须使用内网穿透的网址来让支付宝能在公网上访问到我们的接口地址notify_url=http://内网穿透域名/order/paid/notify
,把内网穿透域名配置成内网主机域名order.earlmall.com
,端口设置成80端口,让请求内网穿透到本机的虚拟机上的nginx
上,
但是内网穿透不是浏览器发起的请求,没有携带请求头,或者就算携带了请求头也是携带的内网穿透服务商分配的域名,nginx
无法从请求头中获取我们指定的Host信息order.earlmall.com
,给内网穿透域名商配置的也只是为了让内网穿透域名商找到内网IP和对应服务端口,Nginx无法根据请求的域名来路由用户请求到网关[因为外网请求实际访问的是域名服务商分配的域名],我们可以在Nginx中做一个精确配置[Nginx优先进行精确匹配],让指定URI为/payed/
的所有支付宝请求直接转发到商城网关,请求经过nginx不再携带原来的Host地址,选择使用自定义的Host地址proxy_set_header Host order.earlmall.com;
经过验证,内网穿透内网穿透服务商只是转发原来的用户请求,原来用户请求的HOSt就是域名服务商分配的二级域名,域名服务商并没有将其改成我们自己的内网域名或者IP
同时server_name
还要配置请求的域名为内网穿透域名,试一下不配置会不会报错
接口必须为POST方式,必须使用@ResponseBody
或者@RestController
响应字符串对象
拦截器放行对应接口,不检查登录状态
就是用户订单列表页面搭建在用户服务中
部署页面[老艺能,不多说,闭着眼睛都能搞]
动静分离
Thymeleaf
渲染
视图页面跳转
配置本地域名映射,商城网关对域名user.earlmall.com
的跳转
把所有订单页面按钮路由到订单列表页
为用户服务配置用户登录拦截器并将拦截器注册到容器组件中
用户状态是使用SpringSession
来协调存储的,要用拦截器必须要引入SpringSession
,否则无法从本地session
中获取到用户的登录状态
还要把Session的相关配置比如json
序列化器、session过期时间等拷贝到用户服务中
配置redis
计算运费的时候调用了用户服务,而且那个是做收货地址的地址查询不需要做用户登录检查,在烂机器中排除掉对应的接口地址
编写后端接口远程调用订单服务分页查询用户所有的订单数据
renren-fast
生成的分页查询接口太粗糙,需要对该功能进行扩展,查询条件包括
用户ID等于当前登录用户的id
用户服务设置Feign请求拦截器,将当前登录用户的cookie设置到远程调用请求的请求头中
查询到的订单数据按照订单自增id降序排列,这样总是能拿到最新的订单数据
只有订单数据不够,还需要订单下的所有订单项数据,订单数据封装在IPage的records属性中,是一个list集合,我们可以取出每个订单数据按照订单号查出每个订单下的所有订单项并重新封装覆盖原来的records属性
分页参数使用Map<String,Object>
进行封装,传参当前页码,没有传参当前页码就默认第一页;
用户服务将查询到的数据放到ModelAndView
请求域中
注意请求数据要使用@RequestBody
来从请求体中获取必须使用POST请求方式,可以使用@PostMapping
,也可以使用@RequestMapping
[检验一下@GetMapping
能不能用@RequestBody
]
给数据库表对应实体类添加数据库没有的字段需要使用注解@TableField(exist=false)
表示数据库内没有该字段
要获取到分页数据的总记录数和总页数,需要配置MyBatisPlus
的拦截器[就是MyBatisPlus
的分页插件],这一块可以完全参考以前的多条件检索分页查询的接口做,包括拦截器也直接参考那个,老师这里没有做多条件查询匹配,其实做的并不好
页面渲染
将订单数据渲染到table
列表组件中
第一个tr
标签是订单信息
订单号
商城名称写死
第二个tr
标签是订单项信息
sku
图片
sku
名称
注意这个用法,设置段落宽度,文字内容超过指定宽度自动换行
xxxxxxxxxx
<p style="width: 242px;height: auto;overflow: auto">
[[${item.skuName}]]
</p>
商品购买数量
收货人姓名
交易总金额,支付方式
订单状态
遍历显示列表,有些数据比如收货人姓名,交易总额,支付方式、订单状态等只需要显示一次,遍历的时候会导致每行都显示,我们可以通过如下设置来让部分内容跨几个订单项只显示一次
th:each="item,itemStatus:order.items"
的itemStatus
可以拿到当前标签的数据遍历状态,其中变量index
表示当前正在遍历第几个元素,从0开始;count
是已经遍历的元素计数,从1开始;size
表示元素总共有几个;我们让只需要展示一遍的数据第一遍遍历的时候显示th:if="${itemStatus.index==0}"
,第一行的数据直接跨所有的列进行展示th:rowspan="${itemStatus.size}"
,后续遍历对应组件因为index
不为0就不会展示了
右边框掉了,我们通过设置td
标签的style
属性即style="border-right: 1px solid #ccc"
来设置右边框边界线
业务实现
相关自定义工具类
[Query
]
注意IPage
类是MP下的
xxxxxxxxxx
/**
* 查询参数
*
* @author Mark sunlightcs@gmail.com
*/
public class Query<T> {
public IPage<T> getPage(Map<String, Object> params) {
return this.getPage(params, null, false);
}
public IPage<T> getPage(Map<String, Object> params, String defaultOrderField, boolean isAsc) {
//分页参数
long curPage = 1;
long limit = 10;
if(params.get(Constant.PAGE) != null){
curPage = Long.parseLong((String)params.get(Constant.PAGE));
}
if(params.get(Constant.LIMIT) != null){
limit = Long.parseLong((String)params.get(Constant.LIMIT));
}
//分页对象
Page<T> page = new Page<>(curPage, limit);
//分页参数
params.put(Constant.PAGE, page);
//排序字段
//防止SQL注入(因为sidx、order是通过拼接SQL实现排序的,会有SQL注入风险)
String orderField = SQLFilter.sqlInject((String)params.get(Constant.ORDER_FIELD));
String order = (String)params.get(Constant.ORDER);
//前端字段排序
if(StringUtils.isNotEmpty(orderField) && StringUtils.isNotEmpty(order)){
if(Constant.ASC.equalsIgnoreCase(order)) {
return page.addOrder(OrderItem.asc(orderField));
}else {
return page.addOrder(OrderItem.desc(orderField));
}
}
//没有排序字段,则不排序
if(StringUtils.isBlank(defaultOrderField)){
return page;
}
//默认排序
if(isAsc) {
page.addOrder(OrderItem.asc(defaultOrderField));
}else {
page.addOrder(OrderItem.desc(defaultOrderField));
}
return page;
}
}
[PageUtils
]
xxxxxxxxxx
/**
* 分页工具类
*
* @author Mark sunlightcs@gmail.com
*/
public class PageUtils implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 总记录数
*/
private int totalCount;
/**
* 每页记录数
*/
private int pageSize;
/**
* 总页数
*/
private int totalPage;
/**
* 当前页数
*/
private int currPage;
/**
* 列表数据
*/
private List<?> list;
/**
* 分页
* @param list 列表数据
* @param totalCount 总记录数
* @param pageSize 每页记录数
* @param currPage 当前页数
*/
public PageUtils(List<?> list, int totalCount, int pageSize, int currPage) {
this.list = list;
this.totalCount = totalCount;
this.pageSize = pageSize;
this.currPage = currPage;
this.totalPage = (int)Math.ceil((double)totalCount/pageSize);
}
/**
* 分页
*/
public PageUtils(IPage<?> page) {
this.list = page.getRecords();
this.totalCount = (int)page.getTotal();
this.pageSize = (int)page.getSize();
this.currPage = (int)page.getCurrent();
this.totalPage = (int)page.getPages();
}
public int getTotalCount() {
return totalCount;
}
public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getTotalPage() {
return totalPage;
}
public void setTotalPage(int totalPage) {
this.totalPage = totalPage;
}
public int getCurrPage() {
return currPage;
}
public void setCurrPage(int currPage) {
this.currPage = currPage;
}
public List<?> getList() {
return list;
}
public void setList(List<?> list) {
this.list = list;
}
}
[控制器方法]
xxxxxxxxxx
/**
* @return {@link R }
* @描述 根据用户id获取用户订单列表
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/06
* @since 1.0.0
*/
"/get/order/list/{userId}") (
public R getOrderListByUserId( ("userId") Long userId,
value = "page",defaultValue = "1") String page, (
value = "limit",defaultValue = "10") String limit){ (
HashMap<String, Object> params = new HashMap<>();
params.put("page",page);
params.put("limit",limit);
PageUtils pageUtils=orderService.getOrderListByUserId(userId,params);
return R.ok().put("page",pageUtils);
}
[业务实现类]
xxxxxxxxxx
/**
* @param userId
* @return {@link PageUtils }
* @描述 根据用户id分页查询获取订单列表
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/06
* @since 1.0.0
*/
public PageUtils getOrderListByUserId(Long userId,Map<String, Object> params) {
QueryWrapper<OrderEntity> wrapper = new QueryWrapper<>();
IPage<OrderEntity> page = this.page(
new Query<OrderEntity>().getPage(params),
wrapper.eq("member_id",userId).orderByDesc("id")
);
List<OrderEntity> orderEntities = page.getRecords();
List<String> orderSns = orderEntities.stream().map(OrderEntity::getOrderSn).collect(Collectors.toList());
List<OrderItemEntity> orderItems = orderItemService.list(new QueryWrapper<OrderItemEntity>().in("order_sn", orderSns));
List<OrderItemVo> orderItemVos = orderItems.stream().map(orderItemEntity -> {
OrderItemVo orderItem = new OrderItemVo();
BeanUtils.copyProperties(orderItemEntity, orderItem);
return orderItem;
}).collect(Collectors.toList());
HashMap<String, List<OrderItemVo>> orderItemsOfOrderSns = new HashMap<>();
for (OrderItemVo orderItem : orderItemVos) {
if(orderItemsOfOrderSns.get(orderItem.getOrderSn())==null){
ArrayList<OrderItemVo> orderItemsOfOrderSn = new ArrayList<>();
orderItemsOfOrderSns.put(orderItem.getOrderSn(),orderItemsOfOrderSn);
}
orderItemsOfOrderSns.get(orderItem.getOrderSn()).add(orderItem);
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<OrderVo> orders = orderEntities.stream().map(orderEntity -> {
OrderVo order = new OrderVo();
BeanUtils.copyProperties(orderEntity, order);
order.setOrderCreateTime(sdf.format(orderEntity.getCreateTime()));
return order;
}).collect(Collectors.toList());
orders.sort((o1, o2) -> Long.compare(o2.getId(),o1.getId()));
for (OrderVo order : orders) {
order.setOrderItems(orderItemsOfOrderSns.get(order.getOrderSn()));
}
Page<OrderVo> orderPage = new Page<>(page.getCurrent(), page.getSize(), page.getTotal());
orderPage.setRecords(orders);
return new PageUtils(orderPage);
}
[数据渲染]
xxxxxxxxxx
<table class="table" th:each="order:${page.list}">
<tr>
<td colspan="7" style="background:#F7F7F7" >
<span style="color:#AAAAAA" th:text="${order.orderCreateTime}">2017-12-09 20:50:10</span>
<span><ruby style="color:#AAAAAA">订单号:</ruby> <span th:text="${order.orderSn}">70207298274</span></span>
<span>Earl商城官方旗舰店<i class="table_i"></i></span>
<i class="table_i5 isShow"></i>
</td>
</tr>
<tr class="tr" th:each="orderItem,itemStatus:${order.orderItems}">
<td colspan="3">
<img style="width: 60px;height: 60px;" th:src="${orderItem.skuPic}" alt="" class="img">
<div>
<p style="width: 242px;height: auto;overflow: auto;" th:text="${orderItem.skuName}">MUXIWEIERPU皮手套男冬季加绒保暖户外骑车开车触摸屏全指防寒全指手套 黑色 均码</p>
<div><i class="table_i4"></i>找搭配</div>
</div>
<div style="margin-left:15px;" th:text="'x'+${orderItem.skuQuantity}">x1</div>
<div style="clear:both"></div>
</td>
<td th:text="${order.receiverName}" th:if="${itemStatus.index==0}" th:rowspan="${itemStatus.size}">张三<i><i class="table_i1"></i></i></td>
<td style="padding-left:10px;color:#AAAAB1;" th:if="${itemStatus.index==0}" th:rowspan="${itemStatus.size}">
<p style="margin-bottom:5px;" th:text="'总额 ¥'+${order.payAmount}">总额 ¥26.00</p>
<hr style="width:90%;">
<p>在线支付</p>
</td>
<td th:if="${itemStatus.index==0}" th:rowspan="${itemStatus.size}">
<ul>
<li style="color:#71B247;" th:if="${order.status==0}">待付款</li>
<li style="color:#71B247;" th:if="${order.status==1}">待发货</li>
<li style="color:#71B247;" th:if="${order.status==2}">已发货</li>
<li style="color:#71B247;" th:if="${order.status==3}">已完成</li>
<li style="color:#71B247;" th:if="${order.status==4}">已关闭</li>
<li style="color:#71B247;" th:if="${order.status==5}">无效订单</li>
<li class="tdLi">订单详情</li>
</ul>
</td>
<td>
<button>确认收货</button>
<p style="margin:4px 0; ">取消订单</p>
<p>催单</p>
</td>
</tr>
</table>
[分页组件]
xxxxxxxxxx
<div class="order_btm">
<div>
<button class="page_a" th:attr="pn=${page.currPage - 1}"
th:if="${page.currPage > 1}">上一页</button>
<span class="page_a" th:attr="pn=${i},style=${i == page.currPage?'color: red;':'color: black;'}"
th:each="i:${#numbers.sequence(1,page.totalPage)}"
th:if="${page.totalPage > 0}"
th:text="${i}">1</span>
<button class="page_a" th:attr="pn=${page.currPage + 1}"
th:if="${page.currPage < page.totalPage}">下一页</button>
</div>
</div>
<script>
$(".page_a").click(function (){
var pn=$(this).attr("pn");
location.href = replaceOrAddParamVal(location.href,"page",pn);
return false;
});
//替换指定请求路径中的请求参数,如果参数已经存在就直接替换,如果参数不存在就追加参数
function replaceOrAddParamVal(url,paramName,replaceVal,forceAdd=false){
var oUrl = url.toString();
//旧请求路径中有了就进行替换,没有对应参数就添加参数
if (oUrl.indexOf(paramName+'=') != -1){
if(forceAdd){
return addURIParam(oUrl,paramName,replaceVal);
}else{
var re = eval('/('+paramName+'=)([^&]*)/gi');
var nUrl = oUrl.replace(re,paramName+'='+replaceVal);
return nUrl;
}
}else{
return addURIParam(oUrl,paramName,replaceVal);
}
}
//新增参数
function addURIParam(oUrl,paramName,replaceVal){
//添加参数
var nUrl="";
//检查旧链接中有没有问号,有问号就追加参数,没有就新增问号并添加参数
if(oUrl.indexOf("?")!=-1){
nUrl = oUrl+"&"+paramName+"="+replaceVal;
}else{
nUrl = oUrl+"?"+paramName+"="+replaceVal;
}
return nUrl;
}
</script>
一般用户同步通知还没有到服务器的异步通知就到了
异步通知的参数
用户成功支付以后支付宝会每隔几秒就给我们提供的服务器异步通知路径notify_url
发起POST请求,将支付结果作为参数通知商户,所有的参数和介绍都在https://opendocs.alipay.com/open/270/105902?pathHash=d5cd617e
trade_status
:交易状态,交易状态包括TRADE_SUCCESS
[交易支付成功]、TRADE_CLOSED
[未付款交易关闭或支付后全额退款]、TRADE_FINISHED
[交易结束,不可退款]、WAIT_BUYER_PAY
[交易创建,等待买家付款]
out_trade_no
:订单号
trade_no
:支付宝交易号,相当于支付宝为此次交易设置的订单号
使用Vo类封装支付宝的服务端响应参数
注意请求参数会被自动封装到对应的VO类中,除此以外我们还可以通过httpServletRequest.getParameterMap()
来从请求中直接获取参数集合
注意支付宝返回的notify_time
数据类型为String
类型,直接将该数据类型转成Date
类型会报错,我们需要在配置文件中配置spring.mvc.date-format=yyyy-MM-dd HH:mm:ss
日期类型才能将指定格式的时间字符串格式化为Date类型;此外我们还可以在对应属性上使用注解@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
,SpringBoot2.4
以上版本叫spring.mvc.format.date
xxxxxxxxxx
package com.atguigu.gulimall.order.vo.pay;
import lombok.Data;
import lombok.ToString;
import java.util.time.Date;
public class PayAsyncVo {
private String gmt_create;
private String charset;
private String gmt_payment;
private Date notify_time;
private String subject;
private String sign;
private String buyer_id;//支付者的id
private String body;//订单的信息
private String invoice_amount;//支付金额
private String version;
private String notify_id;//通知id
private String fund_bill_list;
private String notify_type;//通知类型; trade_status_sync
private String out_trade_no;//订单号
private String total_amount;//支付的总额
private String trade_status;//交易状态 TRADE_SUCCESS
private String trade_no;//流水号
private String auth_app_id;//
private String receipt_amount;//商家收到的款
private String point_amount;//
private String app_id;//应用id
private String buyer_pay_amount;//最终支付的金额
private String sign_type;//签名类型
private String seller_id;//商家的id
}
处理支付宝的支付返回结果
1️⃣:在数据库表oms_payment_info
记录了订单支付流水,字段包括order_sn
[订单号]、alipay_trade_no
[支付宝交易号]、total_amount
[支付金额]、subject
[订单主题]、payment_status
[支付状态],callbackTime
[异步通知回调时间]记录流水的作用是每隔一个月就可以和支付宝的支付流水进行一个对照,第一步就是根据支付宝的返回结果直接保存一份支付流水
这里限制了一个订单只有一个流水,限制了id
为主键、order_sn
订单号为唯一键索引、alipay_trade_no
支付宝交易号为唯一键索引
把数据库表订单号的长度更改为64
2️⃣:验证支付宝签名,作用是确保该请求是支付宝给我们发送的数据,验签流程可以参考支付宝的DEMO中的notify_url.jsp
[notify_url.jsp
]
xxxxxxxxxx
<%page language="java" contentType="text/html; charset=utf-8"
pageEncoding="utf-8"%>
<%page import="java.util.*"%>
<%page import="java.util.Map"%>
<%page import="com.alipay.config.*"%>
<%page import="com.alipay.api.*"%>
<%page import="com.alipay.api.internal.util.*"%>
<%
/* *
* 功能:支付宝服务器异步通知页面
* 日期:2017-03-30
* 说明:
* 以下代码只是为了方便商户测试而提供的样例代码,商户可以根据自己网站的需要,按照技术文档编写,并非一定要使用该代码。
* 该代码仅供学习和研究支付宝接口使用,只是提供一个参考。
*************************页面功能说明*************************
* 创建该页面文件时,请留心该页面文件中无任何HTML代码及空格。
* 该页面不能在本机电脑测试,请到服务器上做测试。请确保外部可以访问该页面。
* 如果没有收到该页面返回的 success
* 建议该页面只做支付成功的业务逻辑处理,退款的处理请以调用退款查询接口的结果为准。
*/
//获取支付宝POST过来反馈信息,将支付宝返回的所有请求参数都封装到一个Map集合
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
//注意这行代码只能在乱码的时候再执行
//valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
//调用alipaySignature.rsaCheckV1()方法来进行验签,这里面使用了AlipayConfig配置文件中的属性,我们是单独封装了一个AlipayTemplate,注意进行替换,验签会返回验证的结果,如果验签成功signVerified为true说明这是支付宝发回来的数据可以执行业务方法,如果验签失败说明signVerified为false说明这个数据有问题,不是支付宝返回的数据不能执行业务方法
boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名
//——请在这里编写您的程序(以下代码仅作参考)——
/* 实际验证过程建议商户务必添加以下校验:
1、需要验证该通知数据中的out_trade_no是否为商户系统中创建的订单号,
2、判断total_amount是否确实为该订单的实际金额(即商户订单创建时的金额),
3、校验通知中的seller_id(或者seller_email) 是否为out_trade_no这笔单据的对应的操作方(有的时候,一个商户可能有多个seller_id/seller_email)
4、验证app_id是否为该商户本身。
*/
if(signVerified) {//验证成功
//商户订单号
String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"),"UTF-8");
//支付宝交易号
String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"),"UTF-8");
//交易状态
String trade_status = new String(request.getParameter("trade_status").getBytes("ISO-8859-1"),"UTF-8");
if(trade_status.equals("TRADE_FINISHED")){
//判断该笔订单是否在商户网站中已经做过处理
//如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
//如果有做过处理,不执行商户的业务程序
//注意:
//退款日期超过可退款期限后(如三个月可退款),支付宝系统发送该交易状态通知
}else if (trade_status.equals("TRADE_SUCCESS")){
//判断该笔订单是否在商户网站中已经做过处理
//如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
//如果有做过处理,不执行商户的业务程序
//注意:
//付款完成后,支付宝系统发送该交易状态通知
}
out.println("success");
}else {//验证失败
out.println("fail");
//调试用,写文本函数记录程序运行情况是否正常
//String sWord = AlipaySignature.getSignCheckContentV1(params);
//AlipayConfig.logResult(sWord);
}
//——请在这里编写您的程序(以上代码仅作参考)——
%>
[业务方法]
这里只要验签成功就更改订单状态
xxxxxxxxxx
//获取支付宝POST过来反馈信息,将支付宝返回的所有请求参数都封装到一个Map集合
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
//调用alipaySignature.rsaCheckV1()方法来进行验签,这里面使用了AlipayConfig配置文件中的属性,我们是单独封装了一个AlipayTemplate,注意进行替换,验签会返回验证的结果,如果验签成功signVerified为true说明这是支付宝发回来的数据可以执行业务方法,如果验签失败说明signVerified为false说明这个数据有问题,不是支付宝返回的数据不能执行业务方法
boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名
3️⃣:修改订单状态,支付状态payment_status
中的状态TRADE_SUCCESS
的通知触发条件是商户签约的产品支持退款功能且买家付款成功,状态TRADE_FINISHED
的通知触发条件是商户签约产品不支持退款功能且买家付款成功,只要支付状态是这两种状态,我们就将用户的订单状态修改为已支付
代码实现
[控制器方法]
xxxxxxxxxx
/**
* @return {@link String }
* @描述 支付宝支付回调
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/05
* @since 1.0.0
*/
"/paid/notify") (
public String paidNotify( AlipayAsyncVo alipayAsync, HttpServletRequest request){
System.out.println("notify");
try {
orderWebService.paidNotify(alipayAsync,request);
return "success";
} catch (AlipayApiException e) {
e.printStackTrace();
}
return "failed";
}
[业务方法]
xxxxxxxxxx
/**
* @param alipayAsync
* @描述 支付宝异步回调通知业务方法
* @author Earl
* @version 1.0.0
* @创建日期 2024/12/06
* @since 1.0.0
*/
public void paidNotify(AlipayAsyncVo alipayAsync, HttpServletRequest request) throws AlipayApiException {
//1. 验证支付宝异步回调签名,验签不通过直接抛异常
if(!alipayTemplate.verifySignature(request)){
throw new RRException(StatusCode.ALIPAY_NOTIFY_SIGNATURE_VERIFY_EXCEPTION.getMsg(),
StatusCode.ALIPAY_NOTIFY_SIGNATURE_VERIFY_EXCEPTION.getCode());
}
//2. 记录支付流水
PaymentInfoEntity paymentInfoEntity = new PaymentInfoEntity();
paymentInfoEntity.setOrderSn(alipayAsync.getOut_trade_no());
paymentInfoEntity.setAlipayTradeNo(alipayAsync.getTrade_no());
paymentInfoEntity.setTotalAmount(new BigDecimal(alipayAsync.getTotal_amount()));
paymentInfoEntity.setSubject(alipayAsync.getSubject());
paymentInfoEntity.setPaymentStatus(alipayAsync.getTrade_status());
paymentInfoEntity.setCallbackTime(alipayAsync.getNotify_time());
paymentInfoService.save(paymentInfoEntity);
//3. 更改订单状态
if(alipayAsync.getTrade_status().equals(OrderConstant.AlipayTradeStatus.TRADE_SUCCESS.getMsg())||
alipayAsync.getTrade_status().equals(OrderConstant.AlipayTradeStatus.TRADE_FINISHED.getMsg())){
orderService.updateOrderStatus(alipayAsync.getOut_trade_no());
}
}
我们自己的系统超时未支付就会自动关闭订单并且释放库存,但是我们无法控制用户的支付行为,也无法控制支付宝的收款行为,就可能发生商户订单已经关闭,库存已经解锁;但是此时用户直接打开支付宝的收银页面并且支付了,此时支付宝仍然能异步回调我们的接口并验签成功更改订单状态,但是此时我们已经关闭了订单,库存也释放掉了,万一库存没有了,此时订单支付成功就会导致纠纷
支付宝提供了收单功能,一旦超出我们设定的时间用户没有支付,支付宝就会将对用户的收银功能关闭掉
在支付宝的API列表中可以看到支付宝的可调用接口,点进统一收单下单并支付页面接口可以查看接口的详细信息,包括可以携带的参数和参数说明
在该参数表中可以传参一个time_expire
,参数说明为绝对超时时间,格式为yyyy-MM-dd HH:mm:ss
,只要到了指定的绝对时间以后订单都无法再支付
该参数列表还可以传参一个timeout_express
,参数说明为相对超时时间,可取值返回是1分钟到15天,单位有m
分钟、h
小时、d
天、1c
当天[1C的含义是无论交易在当天何时创建都会在0点关闭],注意该参数值不接收小数,我们使用该参数设置关单时间为1m,与订单关闭时间相同,感觉这种实现不好,还是创建订单就指定绝对支付时间比较靠谱
有可能订单最后一刻支付,订单在服务器异步通知的过程中商户订单关闭库存解锁后,异步通知才到;为了避免这个问题,支付宝提供了手动收单功能,用户只要在关闭订单的同时向支付宝发起收单请求,支付宝就会支付失败,怪不得用户同步通知页面比服务器通知页面慢很多,只要服务器通知没到,用户同步通知就到不了,收单的代码示例在DEMO中的alipay.trade.close.jsp
中
[alipay.trade.close.jsp
]
xxxxxxxxxx
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>交易关闭</title>
</head>
<%page language="java" contentType="text/html; charset=utf-8" pageEncoding="utf-8"%>
<%page import="com.alipay.config.*"%>
<%page import="com.alipay.api.*"%>
<%page import="com.alipay.api.request.*"%>
<%
//获得初始化的AlipayClient
AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.gatewayUrl, AlipayConfig.app_id, AlipayConfig.merchant_private_key, "json", AlipayConfig.charset, AlipayConfig.alipay_public_key, AlipayConfig.sign_type);
//设置请求参数
AlipayTradeCloseRequest alipayRequest = new AlipayTradeCloseRequest();
//商户订单号,商户网站订单系统中唯一订单号
String out_trade_no = new String(request.getParameter("WIDTCout_trade_no").getBytes("ISO-8859-1"),"UTF-8");
//支付宝交易号
String trade_no = new String(request.getParameter("WIDTCtrade_no").getBytes("ISO-8859-1"),"UTF-8");
//请二选一设置
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\"," +"\"trade_no\":\""+ trade_no +"\"}");
//执行收单请求
String result = alipayClient.execute(alipayRequest).getBody();
//输出
out.println(result);
%>
<body>
</body>
</html>
[设置AlipayTemplate
收单方法]
老师没做手动收单功能,用的只是自动收单,不是线上要测试出这个效果都难
每天晚上闲时调用支付宝的交易查询接口,该接口DEMO中也有,也可以查看支付宝提供的API列表查看,下载支付宝对账单,对当天支付订单一一对账
常见的加密算法:对称加密、非对称加密、
对称加密
原理示意图
原理:客户端使用密钥A对明文加密生成密文,服务端使用同一把密钥A对密文解密生成明文
核心:加密和解密使用的是同一把密钥
方案:DES、3DES[TripleDES]、AES、RC2、RC4、RC5、BlowFish
缺陷:一旦密钥被截取或者破解,网络传输中就能随意获取篡改用户请求明文来更改服务端数据,既然知道加密规则就一定能通过密文获取到明文,即便加密解密过程不同也可以通过彩虹表暴力匹配
应用场景
这种加密方式非常不安全,在金融领域根本不能使用
非对称加密
原理示意图
原理:客户端使用密钥A对明文进行加密生成密文,密文只有在服务器中使用密钥B才能解密出明文,使用密钥A无法再解密出明文;服务端的响应数据使用密钥C加密生成密文,客户端使用密钥D解密生成明文,只有使用密钥C加密的密文才能使用密钥D解密,使用密钥D加密的密文无法被密钥D解密,这样即使第三方截取响应内容篡改以后加密的密文无法被客户端正常解密;密钥B和密钥C都只存在服务器内,不存在丢失的风险
非对称加密算法中,公钥私钥是相对于密钥的生成者来说的,存放在生产者手里只提供给生产者使用的就是私钥,发布出去给各个客户端使用的就是公钥[这里有歧义,复习Nginx的时候确认一下,我查了一下网上是发布出去给客户端使用的就是公钥,但是支付宝的加密模型中是加密的是私钥,解密的是公钥]
注意请求到响应两个过程一共是两对密钥四把钥匙,RSA算法一次生成的密钥就是一对
核心:请求和响应使用不同的两对密钥,加密密钥加密的密文只能被解密密钥解密,解密密钥只能解密加密密钥加密的密文
方案:RSA[SHA1金融领域非常常用的非对称加密算法]、Elgamal、RSA2[SHA256]
缺陷:
非对称加密算法仍然存在缺陷,一些不法组织可能给用户弄一个自己的客户端模拟实际的客户端,用户将服务器需要的数据直接明文传递给不法组织的服务器,不法组织的服务器拿着用户的数据篡改以后来代替用户向实际的服务器发起请求,收到响应以后解密篡改响应数据并响应给用户
不法组织可以直接获取到密文,可以在用户发起请求的同时用已知的参数对应密文替换掉当前请求的参数密文[比如在用户的支付金额后面加两个0来让用户多转一点钱]
这种替换参数密文的问题可以通过商户公钥结合请求参数和时间戳等参数通过摘要算法生成一个加密的签名,通过该签名可以验证用户的请求参数是否在网络传输过程中被篡改过[这里要考虑不法组织能否获取到商户的公钥,但是老师显然是不考虑这个问题的],老师的意思就是直接传递参数密文可以直接通过密文来替换参数,我们通过商户特有参数和请求参数通过商户公钥加密,在服务端收到密文解密以后获取到请求参数后还能检验请求参数是否被篡改过,比如使用MD5算法来检查请求参数是否被篡改过